1use rustc_hash::FxHashMap;
2use std::collections::{HashSet, VecDeque};
3use std::num::NonZeroUsize;
4use std::path::Path;
5use std::sync::atomic::{AtomicI64, AtomicUsize, Ordering};
6use std::time::{Duration, SystemTime, UNIX_EPOCH};
7
8use lru::LruCache;
9use parking_lot::{Mutex, MutexGuard};
10use rusqlite::{params, types::ToSql, Connection, OpenFlags};
11
12use crate::config::{Durability, SqliteTuning};
13use crate::errors::{MCSError, Result};
14use crate::types::{Entity, Relation};
15
16const MAX_TRAVERSAL_ENTITIES: usize = 500_000;
19const MAX_TRAVERSAL_RELS: usize = 2_000_000;
20
21fn sqlite_err(e: rusqlite::Error) -> MCSError {
22 MCSError::IoError(std::io::Error::other(e))
23}
24
25const fn is_not_found(e: &rusqlite::Error) -> bool {
26 matches!(e, rusqlite::Error::QueryReturnedNoRows)
27}
28
29#[inline(always)]
30fn now_us() -> i64 {
31 SystemTime::now()
32 .duration_since(UNIX_EPOCH)
33 .unwrap_or_default()
34 .as_micros() as i64
35}
36
37#[inline(always)]
38pub(crate) fn name_hash(name: &str) -> i64 {
39 let mut h: u64 = 0xcbf29ce484222325;
40 for b in name.bytes() {
41 h ^= u64::from(b);
42 h = h.wrapping_mul(0x100000001b3);
43 }
44 h as i64
45}
46
47fn load_observations(conn: &Connection, entity_id: i64) -> Result<Vec<String>> {
48 let mut stmt = conn
49 .prepare_cached("SELECT body FROM observation WHERE entity_id = ?1 ORDER BY idx")
50 .map_err(sqlite_err)?;
51 let rows = stmt
52 .query_map(params![entity_id], |row| row.get::<_, String>(0))
53 .map_err(sqlite_err)?
54 .filter_map(|r| r.ok())
55 .collect::<Vec<_>>();
56 Ok(rows)
57}
58
59fn load_observations_opt(conn: &Connection, entity_id: i64) -> Vec<String> {
60 load_observations(conn, entity_id).unwrap_or_default()
61}
62
63fn entity_name_lookup(conn: &Connection, name: &str) -> Result<Option<i64>> {
64 let h = name_hash(name);
65 let mut stmt = conn
66 .prepare_cached(
67 "SELECT id FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
68 )
69 .map_err(sqlite_err)?;
70 match stmt.query_row(params![h, name], |row| row.get::<_, i64>(0)) {
71 Ok(id) => Ok(Some(id)),
72 Err(e) if is_not_found(&e) => Ok(None),
73 Err(e) => Err(sqlite_err(e)),
74 }
75}
76
77fn get_type_id(conn: &Connection, type_name: &str, kind: i64) -> Result<i64> {
78 let mut sel = conn
79 .prepare_cached("SELECT id FROM type_dict WHERE kind = ?1 AND name = ?2")
80 .map_err(sqlite_err)?;
81 if let Ok(id) = sel.query_row(params![kind, type_name], |row| row.get::<_, i64>(0)) {
82 return Ok(id);
83 }
84 conn.execute(
85 "INSERT INTO type_dict (kind, name, count) VALUES (?1, ?2, 0)",
86 params![kind, type_name],
87 )
88 .map_err(sqlite_err)?;
89 Ok(conn.last_insert_rowid())
90}
91
92fn lookup_type_id(conn: &Connection, type_name: &str, kind: i64) -> Option<i64> {
96 conn.prepare_cached("SELECT id FROM type_dict WHERE kind = ?1 AND name = ?2")
97 .ok()?
98 .query_row(params![kind, type_name], |row| row.get::<_, i64>(0))
99 .ok()
100}
101
102fn inc_type_count(conn: &Connection, type_id: i64, delta: i64) -> Result<()> {
103 conn.execute(
104 "UPDATE type_dict SET count = count + ?1 WHERE id = ?2",
105 params![delta, type_id],
106 )
107 .map_err(sqlite_err)?;
108 Ok(())
109}
110
111fn inc_graph_stat(conn: &Connection, key: &str, delta: i64) -> Result<()> {
112 conn.execute(
113 "UPDATE graph_stat SET value = value + ?1 WHERE key = ?2",
114 params![delta, key],
115 )
116 .map_err(sqlite_err)?;
117 Ok(())
118}
119
120fn read_graph_stat(conn: &Connection, key: &str) -> Result<i64> {
121 conn.query_row(
122 "SELECT value FROM graph_stat WHERE key = ?1",
123 params![key],
124 |row| row.get(0),
125 )
126 .map_err(sqlite_err)
127}
128
129fn name_of_type(conn: &Connection, type_id: i64) -> Result<String> {
130 conn.query_row(
131 "SELECT name FROM type_dict WHERE id = ?1",
132 params![type_id],
133 |row| row.get(0),
134 )
135 .map_err(sqlite_err)
136}
137
138fn select_all_types(conn: &Connection, kind: i64) -> Result<Vec<(String, usize)>> {
139 let mut stmt = conn
140 .prepare_cached(
141 "SELECT name, count FROM type_dict WHERE kind = ?1 AND count > 0 ORDER BY count DESC",
142 )
143 .map_err(sqlite_err)?;
144 let rows = stmt
145 .query_map(params![kind], |row| {
146 Ok((
147 row.get::<_, String>(0)?,
148 row.get::<_, i64>(1)? as usize,
149 ))
150 })
151 .map_err(sqlite_err)?
152 .filter_map(|r| r.ok())
153 .collect();
154 Ok(rows)
155}
156
157fn entity_by_id(conn: &Connection, id: i64) -> Result<Entity> {
158 let mut stmt = conn
159 .prepare_cached(
160 "SELECT e.name, t.name,
161 COALESCE((SELECT json_group_array(o.body ORDER BY o.idx) FROM observation o WHERE o.entity_id = e.id), '[]')
162 FROM entity e JOIN type_dict t ON t.id = e.type_id WHERE e.id = ?1 AND e.flags = 0",
163 )
164 .map_err(sqlite_err)?;
165 let (name, etype, obs_json): (String, String, String) = stmt
166 .query_row(params![id], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)))
167 .map_err(sqlite_err)?;
168 let observations: Vec<String> = serde_json::from_str(&obs_json).unwrap_or_default();
169 Ok(Entity {
170 name,
171 entity_type: etype,
172 observations,
173 })
174}
175
176#[derive(Debug, Clone, Copy, PartialEq, Eq)]
178pub enum Direction {
179 Outgoing,
180 Incoming,
181 Both,
182}
183
184impl Direction {
185 pub fn parse(s: Option<&str>) -> Self {
186 match s {
187 Some("OUTGOING") => Direction::Outgoing,
188 Some("INCOMING") => Direction::Incoming,
189 _ => Direction::Both,
190 }
191 }
192}
193
194pub fn push_json_str(buf: &mut String, raw: &str) {
197 buf.push('"');
198 let mut start = 0;
199 let bytes = raw.as_bytes();
200 for (i, &b) in bytes.iter().enumerate() {
201 let esc: u8 = match b {
202 b'"' => b'"',
203 b'\\' => b'\\',
204 b'\n' => b'n',
205 b'\r' => b'r',
206 b'\t' => b't',
207 0x08 => b'b',
208 0x0C => b'f',
209 0x00..=0x07 | 0x0B | 0x0E..=0x1F => continue, _ => continue,
211 };
212 buf.push_str(&raw[start..i]);
213 buf.push('\\');
214 buf.push(esc as char);
215 start = i + 1;
216 }
217 for (i, &b) in bytes.iter().enumerate().skip(start) {
219 if b <= 0x07 || b == 0x0B || (0x0E..=0x1F).contains(&b) {
220 buf.push_str(&raw[start..i]);
221 write_escape_unicode(buf, b);
222 start = i + 1;
223 }
224 }
225 buf.push_str(&raw[start..]);
226 buf.push('"');
227}
228
229#[inline(never)]
230fn write_escape_unicode(buf: &mut String, b: u8) {
231 use std::fmt::Write;
232 write!(buf, "\\u{:04x}", b).unwrap();
233}
234
235#[derive(Copy, Clone)]
238struct EntityMeta {
239 id: i64,
240 type_id: i64,
241 obs_count: i64,
242 out_deg: i64,
243 in_deg: i64,
244}
245
246struct TxGuard<'a> {
249 conn: &'a Connection,
250 done: bool,
251}
252
253impl<'a> TxGuard<'a> {
254 fn begin(conn: &'a Connection) -> Result<Self> {
255 conn.execute_batch("BEGIN IMMEDIATE").map_err(sqlite_err)?;
260 Ok(Self { conn, done: false })
261 }
262
263 fn commit(mut self) -> Result<()> {
264 self.done = true;
265 self.conn.execute_batch("COMMIT").map_err(sqlite_err)
266 }
267}
268
269impl Drop for TxGuard<'_> {
270 fn drop(&mut self) {
271 if !self.done {
272 let _ = self.conn.execute_batch("ROLLBACK");
273 }
274 }
275}
276
277struct ReaderPool {
284 conns: Vec<Mutex<Connection>>,
285 next: AtomicUsize,
286}
287
288impl ReaderPool {
289 fn get(&self) -> MutexGuard<'_, Connection> {
293 for c in &self.conns {
294 if let Some(g) = c.try_lock() {
295 return g;
296 }
297 }
298 let i = self.next.fetch_add(1, Ordering::Relaxed) % self.conns.len();
299 self.conns[i].lock()
300 }
301}
302
303pub struct GraphHandle {
306 writer: Mutex<Connection>,
309 readers: ReaderPool,
311 seq_entity: AtomicI64,
312 seq_obs: AtomicI64,
313 cache: Mutex<LruCache<String, EntityMeta>>,
314}
315
316fn open_reader(path: &Path, tuning: &SqliteTuning) -> Result<Connection> {
323 let conn = Connection::open_with_flags(
324 path,
325 OpenFlags::SQLITE_OPEN_READ_WRITE
326 | OpenFlags::SQLITE_OPEN_NO_MUTEX
327 | OpenFlags::SQLITE_OPEN_URI,
328 )
329 .map_err(sqlite_err)?;
330 conn.busy_timeout(Duration::from_millis(tuning.busy_timeout_ms))
331 .map_err(sqlite_err)?;
332 conn.execute_batch(&format!(
333 "PRAGMA query_only = ON;
334 PRAGMA cache_size = -{};
335 PRAGMA temp_store = MEMORY;
336 PRAGMA mmap_size = {};",
337 tuning.cache_size_kb, tuning.mmap_size
338 ))
339 .map_err(sqlite_err)?;
340 Ok(conn)
341}
342
343impl GraphHandle {
344 pub fn new(
345 path: &Path,
346 durability: Durability,
347 tuning: SqliteTuning,
348 lru_cache_size: NonZeroUsize,
349 read_pool_size: usize,
350 ) -> Result<Self> {
351 let conn = Connection::open(path).map_err(sqlite_err)?;
352 conn.busy_timeout(Duration::from_millis(tuning.busy_timeout_ms))
355 .map_err(sqlite_err)?;
356
357 conn.execute_batch(&format!(
363 "PRAGMA page_size = {};
364 PRAGMA auto_vacuum = INCREMENTAL;",
365 tuning.page_size
366 ))
367 .map_err(sqlite_err)?;
368
369 conn.execute_batch(&format!(
370 "PRAGMA journal_mode = WAL;
371 PRAGMA foreign_keys = OFF;
372 PRAGMA cache_size = -{};
373 PRAGMA temp_store = MEMORY;
374 PRAGMA busy_timeout = {};
375 PRAGMA synchronous = NORMAL;
376 PRAGMA journal_size_limit = {};",
377 tuning.cache_size_kb, tuning.busy_timeout_ms, tuning.journal_size_limit
378 ))
379 .map_err(sqlite_err)?;
380
381 conn.execute_batch(
382 "CREATE TABLE IF NOT EXISTS entity (
383 id INTEGER PRIMARY KEY,
384 name_hash INTEGER NOT NULL,
385 name TEXT NOT NULL,
386 type_id INTEGER NOT NULL,
387 obs_count INTEGER NOT NULL DEFAULT 0,
388 out_deg INTEGER NOT NULL DEFAULT 0,
389 in_deg INTEGER NOT NULL DEFAULT 0,
390 created_us INTEGER NOT NULL,
391 updated_us INTEGER NOT NULL,
392 flags INTEGER NOT NULL DEFAULT 0
393 ) STRICT;
394
395 CREATE INDEX IF NOT EXISTS entity_by_hash
396 ON entity(name_hash, type_id, obs_count, out_deg, in_deg)
397 WHERE flags = 0;
398
399 CREATE INDEX IF NOT EXISTS entity_name_ci
400 ON entity(lower(name))
401 WHERE flags = 0;
402
403 CREATE TABLE IF NOT EXISTS observation (
404 id INTEGER PRIMARY KEY,
405 entity_id INTEGER NOT NULL,
406 idx INTEGER NOT NULL,
407 body TEXT NOT NULL,
408 created_us INTEGER NOT NULL
409 ) STRICT;
410
411 CREATE INDEX IF NOT EXISTS obs_by_entity
412 ON observation(entity_id, idx);
413
414 CREATE TABLE IF NOT EXISTS relation (
415 from_id INTEGER NOT NULL,
416 to_id INTEGER NOT NULL,
417 type_id INTEGER NOT NULL,
418 created_us INTEGER NOT NULL
419 ) STRICT;
420
421 CREATE INDEX IF NOT EXISTS rel_out
422 ON relation(from_id, type_id, to_id);
423
424 CREATE INDEX IF NOT EXISTS rel_in
425 ON relation(to_id, type_id, from_id);
426
427 CREATE VIRTUAL TABLE IF NOT EXISTS name_fts
428 USING fts5(name, content='entity', content_rowid='id',
429 tokenize='unicode61 remove_diacritics 2');
430
431 CREATE VIRTUAL TABLE IF NOT EXISTS obs_fts
432 USING fts5(body, content='observation', content_rowid='id',
433 tokenize='unicode61 remove_diacritics 2');
434
435 CREATE TRIGGER IF NOT EXISTS obs_fts_ai AFTER INSERT ON observation BEGIN
436 INSERT INTO obs_fts(rowid, body) VALUES (new.id, new.body);
437 END;
438
439 CREATE TRIGGER IF NOT EXISTS obs_fts_bd BEFORE DELETE ON observation BEGIN
440 INSERT INTO obs_fts(obs_fts, rowid, body) VALUES ('delete', old.id, '');
441 END;
442
443 CREATE TABLE IF NOT EXISTS type_dict (
444 id INTEGER PRIMARY KEY,
445 kind INTEGER NOT NULL,
446 name TEXT NOT NULL,
447 count INTEGER NOT NULL DEFAULT 0
448 ) STRICT;
449
450 CREATE INDEX IF NOT EXISTS type_by_name
451 ON type_dict(kind, name);
452
453 CREATE TABLE IF NOT EXISTS graph_stat (
454 key TEXT NOT NULL PRIMARY KEY,
455 value INTEGER NOT NULL
456 ) STRICT, WITHOUT ROWID;
457
458 CREATE TABLE IF NOT EXISTS hub_degree (
459 entity_id INTEGER PRIMARY KEY,
460 out_deg INTEGER NOT NULL,
461 in_deg INTEGER NOT NULL
462 ) STRICT;
463
464 CREATE TABLE IF NOT EXISTS partition_map (
465 table_name TEXT NOT NULL PRIMARY KEY,
466 role INTEGER NOT NULL,
467 type_id INTEGER,
468 row_count INTEGER NOT NULL DEFAULT 0
469 ) STRICT, WITHOUT ROWID;",
470 )
471 .map_err(sqlite_err)?;
472
473 conn.execute_batch(&format!("PRAGMA mmap_size = {};", tuning.mmap_size))
474 .map_err(sqlite_err)?;
475
476 let sync_pragma = match durability {
477 Durability::Sync => "PRAGMA synchronous = FULL",
478 Durability::Async => "PRAGMA synchronous = NORMAL",
479 };
480 conn.execute_batch(sync_pragma).map_err(sqlite_err)?;
481
482 conn.execute_batch("PRAGMA analysis_limit = 400;")
485 .map_err(sqlite_err)?;
486
487 let has_stat: bool = conn
488 .query_row("SELECT 1 FROM graph_stat LIMIT 1", [], |_| Ok(()))
489 .is_ok();
490 if !has_stat {
491 conn.execute_batch(
492 "INSERT INTO graph_stat(key, value) VALUES
493 ('entities', 0), ('relations', 0), ('observations', 0),
494 ('entity_seq', 0), ('obs_seq', 0);",
495 )
496 .map_err(sqlite_err)?;
497 }
498
499 conn.execute_batch("PRAGMA optimize;").map_err(sqlite_err)?;
500
501 let seq_entity = read_graph_stat(&conn, "entity_seq").unwrap_or(0);
502 let seq_obs = read_graph_stat(&conn, "obs_seq").unwrap_or(0);
503
504 let pool_size = read_pool_size.max(1);
507 let mut conns = Vec::with_capacity(pool_size);
508 for _ in 0..pool_size {
509 conns.push(Mutex::new(open_reader(path, &tuning)?));
510 }
511 let readers = ReaderPool {
512 conns,
513 next: AtomicUsize::new(0),
514 };
515
516 Ok(Self {
517 writer: Mutex::new(conn),
518 readers,
519 seq_entity: AtomicI64::new(seq_entity),
520 seq_obs: AtomicI64::new(seq_obs),
521 cache: Mutex::new(LruCache::new(lru_cache_size)),
522 })
523 }
524
525 fn next_entity_id(&self) -> i64 {
526 self.seq_entity.fetch_add(1, Ordering::Relaxed) + 1
527 }
528
529 fn next_obs_id(&self) -> i64 {
530 self.seq_obs.fetch_add(1, Ordering::Relaxed) + 1
531 }
532
533 fn meta_get(&self, name: &str) -> Option<EntityMeta> {
534 self.cache.lock().get(name).copied()
535 }
536
537 fn meta_set(&self, name: &str, m: EntityMeta) {
538 self.cache.lock().put(name.to_string(), m);
539 }
540
541 fn meta_remove(&self, name: &str) {
542 self.cache.lock().pop(name);
543 }
544
545 fn meta_update(&self, name: &str, f: impl FnOnce(&mut EntityMeta)) {
546 let mut cache = self.cache.lock();
547 if let Some(m) = cache.get_mut(name) {
548 f(m);
549 }
550 }
551
552 fn get_entity_id(&self, conn: &Connection, name: &str) -> Result<Option<(i64, i64, i64, i64)>> {
553 if let Some(m) = self.meta_get(name) {
554 return Ok(Some((m.id, m.type_id, m.out_deg, m.in_deg)));
555 }
556 let h = name_hash(name);
557 let mut stmt = conn
558 .prepare_cached(
559 "SELECT id, type_id, obs_count, out_deg, in_deg
560 FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
561 )
562 .map_err(sqlite_err)?;
563 match stmt.query_row(params![h, name], |row| {
564 Ok(EntityMeta {
565 id: row.get(0)?,
566 type_id: row.get(1)?,
567 obs_count: row.get(2)?,
568 out_deg: row.get(3)?,
569 in_deg: row.get(4)?,
570 })
571 }) {
572 Ok(m) => {
573 self.meta_set(name, m);
574 Ok(Some((m.id, m.type_id, m.out_deg, m.in_deg)))
575 }
576 Err(e) if is_not_found(&e) => Ok(None),
577 Err(e) => Err(sqlite_err(e)),
578 }
579 }
580
581 fn sync_seqs(&self, conn: &Connection) -> Result<()> {
582 let seq_e = self.seq_entity.load(Ordering::Relaxed);
583 let seq_o = self.seq_obs.load(Ordering::Relaxed);
584 conn.execute(
585 "UPDATE graph_stat SET value = CASE key WHEN 'entity_seq' THEN ?1 WHEN 'obs_seq' THEN ?2 ELSE value END
586 WHERE key IN ('entity_seq', 'obs_seq')",
587 params![seq_e, seq_o],
588 )
589 .map_err(sqlite_err)?;
590 Ok(())
591 }
592
593 pub fn get_entity(&self, name: &str) -> Result<Option<Entity>> {
596 if name.is_empty() {
597 return Ok(None);
598 }
599
600 if let Some(m) = self.meta_get(name) {
601 let conn = self.readers.get();
602 let etype = name_of_type(&conn, m.type_id).unwrap_or_default();
603 let observations = load_observations_opt(&conn, m.id);
604 return Ok(Some(Entity {
605 name: name.to_string(),
606 entity_type: etype,
607 observations,
608 }));
609 }
610
611 let conn = self.readers.get();
612 let h = name_hash(name);
613 let mut stmt = conn
614 .prepare_cached(
615 "SELECT e.id, e.type_id, e.name, t.name,
616 e.obs_count, e.out_deg, e.in_deg
617 FROM entity e
618 JOIN type_dict t ON t.id = e.type_id
619 WHERE e.name_hash = ?1 AND e.name = ?2 AND e.flags = 0",
620 )
621 .map_err(sqlite_err)?;
622 match stmt.query_row(params![h, name], |row| {
623 let id: i64 = row.get(0)?;
624 let type_id: i64 = row.get(1)?;
625 let ename: String = row.get(2)?;
626 let etype: String = row.get(3)?;
627 let obs_count: i64 = row.get(4)?;
628 let out_deg: i64 = row.get(5)?;
629 let in_deg: i64 = row.get(6)?;
630 Ok((id, type_id, ename, etype, obs_count, out_deg, in_deg))
631 }) {
632 Ok((id, type_id, ename, etype, obs_count, out_deg, in_deg)) => {
633 let observations = load_observations_opt(&conn, id);
634 drop(stmt);
635 drop(conn);
636 self.meta_set(&ename, EntityMeta { id, type_id, obs_count, out_deg, in_deg });
637 Ok(Some(Entity {
638 name: ename,
639 entity_type: etype,
640 observations,
641 }))
642 }
643 Err(e) if is_not_found(&e) => Ok(None),
644 Err(e) => Err(sqlite_err(e)),
645 }
646 }
647
648 pub fn create_entities(&self, entities: &[Entity]) -> Result<Vec<Entity>> {
649 let conn = self.writer.lock();
650 let tx = TxGuard::begin(&conn)?;
651
652 let mut ins_ent = conn
653 .prepare_cached(
654 "INSERT INTO entity (id, name_hash, name, type_id, obs_count, out_deg, in_deg, created_us, updated_us, flags)
655 SELECT ?1, ?2, ?3, ?4, ?5, 0, 0, ?6, ?6, 0
656 WHERE NOT EXISTS (SELECT 1 FROM entity WHERE name_hash = ?2 AND name = ?3 AND flags = 0)",
657 )
658 .map_err(sqlite_err)?;
659
660 let mut ins_fts = conn
661 .prepare_cached("INSERT INTO name_fts (rowid, name) VALUES (?1, ?2)")
662 .map_err(sqlite_err)?;
663
664 let batch_ts = now_us();
665 let mut type_cache: FxHashMap<String, i64> = FxHashMap::default();
666 let mut type_deltas: FxHashMap<i64, i64> = FxHashMap::default();
667 let mut total_entities: i64 = 0;
668 let mut total_obs: i64 = 0;
669 let mut created = Vec::new();
670 let mut created_metas: Vec<(String, EntityMeta)> = Vec::new();
671 let mut obs_sql = String::new();
672
673 for entity in entities {
674 if entity.name.is_empty() {
675 continue;
676 }
677 let h = name_hash(&entity.name);
678 let id = self.next_entity_id();
679 let type_id = match type_cache.get(entity.entity_type.as_str()) {
680 Some(t) => *t,
681 None => {
682 let t = get_type_id(&conn, &entity.entity_type, 0)?;
683 type_cache.insert(entity.entity_type.clone(), t);
684 t
685 }
686 };
687 let obs_count = entity.observations.len() as i64;
688
689 let changed = ins_ent
690 .execute(params![id, h, entity.name, type_id, obs_count, batch_ts])
691 .map_err(sqlite_err)?;
692 if changed == 0 {
693 continue;
694 }
695
696 let n = entity.observations.len();
697 if n > 0 {
698 obs_sql.clear();
699
700 let mut oids = Vec::with_capacity(n);
701 let mut idxs = Vec::with_capacity(n);
702 for _ in 0..n {
703 oids.push(self.next_obs_id());
704 }
705 for i in 0..n as i64 {
706 idxs.push(i);
707 }
708
709 obs_sql.push_str("INSERT INTO observation (id,entity_id,idx,body,created_us) VALUES");
710 for i in 0..n {
711 if i > 0 { obs_sql.push(','); }
712 obs_sql.push_str("(?,?,?,?,?)");
713 }
714
715 let mut obs_params: Vec<&dyn ToSql> = Vec::with_capacity(n * 5);
716 for (i, obs) in entity.observations.iter().enumerate() {
717 obs_params.push(&oids[i]);
718 obs_params.push(&id);
719 obs_params.push(&idxs[i]);
720 obs_params.push(obs);
721 obs_params.push(&batch_ts);
722 }
723
724 conn.execute(&obs_sql, obs_params.as_slice())
725 .map_err(sqlite_err)?;
726 }
727
728 ins_fts
729 .execute(params![id, entity.name])
730 .map_err(sqlite_err)?;
731
732 *type_deltas.entry(type_id).or_insert(0) += 1;
733 total_entities += 1;
734 total_obs += obs_count;
735
736 created.push(entity.clone());
737 created_metas.push((entity.name.clone(), EntityMeta {
738 id,
739 type_id,
740 obs_count,
741 out_deg: 0,
742 in_deg: 0,
743 }));
744 }
745
746 if total_entities > 0 {
747 for (type_id, delta) in &type_deltas {
748 inc_type_count(&conn, *type_id, *delta)?;
749 }
750 inc_graph_stat(&conn, "entities", total_entities)?;
751 inc_graph_stat(&conn, "observations", total_obs)?;
752 self.sync_seqs(&conn)?;
753 }
754
755 tx.commit()?;
756
757 if !created_metas.is_empty() {
762 let mut cache = self.cache.lock();
763 for (name, meta) in &created_metas {
764 cache.put(name.clone(), *meta);
765 }
766 }
767
768 Ok(created)
769 }
770
771 pub fn delete_entities(&self, names: &[String]) -> Result<()> {
772 if names.is_empty() {
773 return Ok(());
774 }
775 let conn = self.writer.lock();
776
777 let mut resolved: Vec<(i64, i64, String)> = Vec::with_capacity(names.len());
779 let mut sel = conn
780 .prepare_cached(
781 "SELECT id, type_id FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
782 )
783 .map_err(sqlite_err)?;
784 for name in names {
785 let h = name_hash(name);
786 let (id, type_id) = match sel.query_row(params![h, name], |row| {
787 Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?))
788 }) {
789 Ok(v) => v,
790 Err(e) if is_not_found(&e) => continue,
791 Err(e) => return Err(sqlite_err(e)),
792 };
793 resolved.push((id, type_id, name.clone()));
794 }
795
796 if resolved.is_empty() {
797 return Ok(());
798 }
799
800 let ids: Vec<i64> = resolved.iter().map(|(id, _, _)| *id).collect();
801 let n = ids.len();
802
803 let obs_p: Vec<String> = (0..n).map(|i| format!("?{}", i + 1)).collect();
805 let obs_sql = format!(
806 "DELETE FROM observation WHERE entity_id IN ({})",
807 obs_p.join(",")
808 );
809 let obs_refs: Vec<&dyn ToSql> = ids.iter().map(|id| id as &dyn ToSql).collect();
810 let obs_deleted = conn
811 .execute(&obs_sql, obs_refs.as_slice())
812 .map_err(sqlite_err)? as i64;
813
814 let rel_sql = format!(
816 "DELETE FROM relation WHERE from_id IN ({}) OR to_id IN ({})",
817 obs_p.join(","),
818 obs_p.join(",")
819 );
820 let rel_refs: Vec<&dyn ToSql> = ids.iter().map(|id| id as &dyn ToSql).collect();
821 let rel_deleted = conn
822 .execute(&rel_sql, rel_refs.as_slice())
823 .map_err(sqlite_err)? as i64;
824
825 let fts_values: Vec<String> = (0..n)
827 .map(|_| "('delete', ?, '')".to_string())
828 .collect();
829 let fts_sql = format!(
830 "INSERT INTO name_fts(name_fts, rowid, name) VALUES {}",
831 fts_values.join(", ")
832 );
833 conn.execute(&fts_sql, rusqlite::params_from_iter(&ids))
834 .map_err(sqlite_err)?;
835
836 let mut type_deltas: FxHashMap<i64, i64> = FxHashMap::default();
838 for &(_, type_id, _) in &resolved {
839 *type_deltas.entry(type_id).or_insert(0) += 1;
840 }
841
842 if !type_deltas.is_empty() {
844 let m = type_deltas.len();
845 let type_keys: Vec<i64> = type_deltas.keys().cloned().collect();
846 let type_vals: Vec<i64> = type_deltas.values().map(|v| -*v).collect();
847 let mut case_parts: Vec<String> = Vec::with_capacity(m);
848 let mut id_parts: Vec<String> = Vec::with_capacity(m);
849 for i in 0..m {
850 case_parts.push(format!("WHEN ?{} THEN ?{}", i + 1, m + i + 1));
851 id_parts.push(format!("?{}", i + 1));
852 }
853 let sql = format!(
854 "UPDATE type_dict SET count = MAX(0, count + CASE id {} ELSE 0 END) WHERE id IN ({})",
855 case_parts.join(" "),
856 id_parts.join(","),
857 );
858 let mut params: Vec<Box<dyn ToSql>> = Vec::with_capacity(2 * m);
859 for id in &type_keys {
860 params.push(Box::new(*id));
861 }
862 for delta in &type_vals {
863 params.push(Box::new(*delta));
864 }
865 let param_refs: Vec<&dyn ToSql> = params.iter().map(|p| p.as_ref()).collect();
866 conn.execute(&sql, param_refs.as_slice()).map_err(sqlite_err)?;
867 }
868
869 conn.execute(
871 &format!("DELETE FROM entity WHERE id IN ({})", obs_p.join(",")),
872 ids.iter().map(|id| id as &dyn ToSql).collect::<Vec<_>>().as_slice(),
873 )
874 .map_err(sqlite_err)?;
875
876 inc_graph_stat(&conn, "entities", -(n as i64))?;
878 inc_graph_stat(&conn, "observations", -obs_deleted)?;
879 inc_graph_stat(&conn, "relations", -rel_deleted)?;
880
881 for (_, _, name) in &resolved {
883 self.meta_remove(name);
884 }
885
886 Ok(())
887 }
888
889 #[cfg(feature = "code")]
894 pub fn code_purge_file(&self, rel_path: &str) -> Result<usize> {
895 let defines = self.search_relations(Some(rel_path), None, Some("defines"), Some(crate::code::MAX_SYMBOLS_PER_FILE));
896 let mut names: Vec<String> = defines.into_iter().map(|r| r.to).collect();
897 names.push(rel_path.to_string());
898 let n = names.len();
899 self.delete_entities(&names)?;
900 Ok(n)
901 }
902
903 pub fn create_relations(&self, relations: &[Relation]) -> Result<Vec<Relation>> {
904 let conn = self.writer.lock();
905 let tx = TxGuard::begin(&conn)?;
906
907 let mut ins = conn
908 .prepare_cached(
909 "INSERT INTO relation (from_id, to_id, type_id, created_us)
910 SELECT ?1, ?2, ?3, ?4
911 WHERE NOT EXISTS (SELECT 1 FROM relation WHERE from_id = ?1 AND to_id = ?2 AND type_id = ?3)",
912 )
913 .map_err(sqlite_err)?;
914
915 let ts = now_us();
916 let mut type_cache: FxHashMap<String, i64> = FxHashMap::default();
917 let mut type_deltas: FxHashMap<i64, i64> = FxHashMap::default();
918 let mut out_deltas: FxHashMap<i64, i64> = FxHashMap::default();
919 let mut in_deltas: FxHashMap<i64, i64> = FxHashMap::default();
920 let mut total_relations: i64 = 0;
921 let mut created = Vec::new();
922
923 for rel in relations {
924 let (from_id, _, _, _) = match self.get_entity_id(&conn, &rel.from)? {
925 Some(v) => v,
926 None => continue,
927 };
928 let (to_id, _, _, _) = match self.get_entity_id(&conn, &rel.to)? {
929 Some(v) => v,
930 None => continue,
931 };
932 let type_id = match type_cache.get(rel.relation_type.as_str()) {
933 Some(t) => *t,
934 None => {
935 let t = get_type_id(&conn, &rel.relation_type, 1)?;
936 type_cache.insert(rel.relation_type.clone(), t);
937 t
938 }
939 };
940
941 let changed = ins
942 .execute(params![from_id, to_id, type_id, ts])
943 .map_err(sqlite_err)?;
944 if changed == 0 {
945 continue;
946 }
947
948 *out_deltas.entry(from_id).or_insert(0) += 1;
949 *in_deltas.entry(to_id).or_insert(0) += 1;
950 *type_deltas.entry(type_id).or_insert(0) += 1;
951 total_relations += 1;
952
953 created.push(rel.clone());
954 }
955
956 if total_relations > 0 {
957 for (id, delta) in &out_deltas {
958 conn.execute(
959 "UPDATE entity SET out_deg = out_deg + ?1 WHERE id = ?2",
960 params![delta, id],
961 )
962 .map_err(sqlite_err)?;
963 }
964 for (id, delta) in in_deltas {
965 conn.execute(
966 "UPDATE entity SET in_deg = in_deg + ?1 WHERE id = ?2",
967 params![delta, id],
968 )
969 .map_err(sqlite_err)?;
970 }
971 for (type_id, delta) in &type_deltas {
972 inc_type_count(&conn, *type_id, *delta)?;
973 }
974 inc_graph_stat(&conn, "relations", total_relations)?;
975 }
976
977 tx.commit()?;
978
979 if !created.is_empty() {
982 let mut cache = self.cache.lock();
983 for rel in &created {
984 if let Some(m) = cache.get_mut(&rel.from) {
985 m.out_deg += 1;
986 }
987 if let Some(m) = cache.get_mut(&rel.to) {
988 m.in_deg += 1;
989 }
990 }
991 }
992
993 Ok(created)
994 }
995
996 pub fn delete_relations(&self, relations: &[Relation]) -> Result<()> {
997 if relations.is_empty() {
998 return Ok(());
999 }
1000 let conn = self.writer.lock();
1001
1002 let mut triples: Vec<(i64, i64, i64)> = Vec::with_capacity(relations.len());
1004 let mut names: Vec<(String, String)> = Vec::with_capacity(relations.len());
1005 for rel in relations {
1006 let (from_id, _, _, _) = match self.get_entity_id(&conn, &rel.from)? {
1007 Some(v) => v,
1008 None => continue,
1009 };
1010 let (to_id, _, _, _) = match self.get_entity_id(&conn, &rel.to)? {
1011 Some(v) => v,
1012 None => continue,
1013 };
1014 let type_id = match get_type_id(&conn, &rel.relation_type, 1) {
1015 Ok(id) => id,
1016 Err(_) => continue,
1017 };
1018 triples.push((from_id, to_id, type_id));
1019 names.push((rel.from.clone(), rel.to.clone()));
1020 }
1021
1022 if triples.is_empty() {
1023 return Ok(());
1024 }
1025
1026 let mut sql = String::from(
1028 "DELETE FROM relation WHERE (from_id, to_id, type_id) IN (",
1029 );
1030 for (i, _) in triples.iter().enumerate() {
1031 if i > 0 {
1032 sql.push_str(", ");
1033 }
1034 let base = (i * 3) + 1;
1035 sql.push_str(&format!("SELECT ?{b}, ?{bp1}, ?{bp2}", b = base, bp1 = base + 1, bp2 = base + 2));
1036 }
1037 sql.push(')');
1038
1039 let mut param_values: Vec<Box<dyn ToSql>> = Vec::with_capacity(triples.len() * 3);
1040 for &(f, t, tp) in &triples {
1041 param_values.push(Box::new(f));
1042 param_values.push(Box::new(t));
1043 param_values.push(Box::new(tp));
1044 }
1045 let param_refs: Vec<&dyn ToSql> = param_values.iter().map(|p| p.as_ref()).collect();
1046 let total = conn.execute(&sql, param_refs.as_slice()).map_err(sqlite_err)?;
1047 if total == 0 {
1048 return Ok(());
1049 }
1050
1051 let mut out_deltas: FxHashMap<i64, i64> = FxHashMap::default();
1053 let mut in_deltas: FxHashMap<i64, i64> = FxHashMap::default();
1054 let mut type_deltas: FxHashMap<i64, i64> = FxHashMap::default();
1055 for &(from_id, to_id, type_id) in &triples {
1056 *out_deltas.entry(from_id).or_insert(0) += 1;
1057 *in_deltas.entry(to_id).or_insert(0) += 1;
1058 *type_deltas.entry(type_id).or_insert(0) += 1;
1059 }
1060
1061 let out_keys: Vec<i64> = out_deltas.keys().cloned().collect();
1063 let out_vals: Vec<i64> = out_deltas.values().cloned().collect();
1064 if !out_keys.is_empty() {
1065 let m = out_keys.len();
1066 let mut case_parts: Vec<String> = Vec::with_capacity(m);
1067 let mut id_parts: Vec<String> = Vec::with_capacity(m);
1068 for i in 0..m {
1069 case_parts.push(format!("WHEN ?{} THEN ?{}", i + 1, m + i + 1));
1070 id_parts.push(format!("?{}", i + 1));
1071 }
1072 let sql = format!(
1073 "UPDATE entity SET out_deg = MAX(0, out_deg - CASE id {} ELSE 0 END) WHERE id IN ({})",
1074 case_parts.join(" "),
1075 id_parts.join(","),
1076 );
1077 let mut params: Vec<Box<dyn ToSql>> = Vec::with_capacity(2 * m);
1078 for id in &out_keys {
1079 params.push(Box::new(*id));
1080 }
1081 for delta in &out_vals {
1082 params.push(Box::new(*delta));
1083 }
1084 let param_refs: Vec<&dyn ToSql> = params.iter().map(|p| p.as_ref()).collect();
1085 conn.execute(&sql, param_refs.as_slice()).map_err(sqlite_err)?;
1086 }
1087
1088 let in_keys: Vec<i64> = in_deltas.keys().cloned().collect();
1090 let in_vals: Vec<i64> = in_deltas.values().cloned().collect();
1091 if !in_keys.is_empty() {
1092 let m = in_keys.len();
1093 let mut case_parts: Vec<String> = Vec::with_capacity(m);
1094 let mut id_parts: Vec<String> = Vec::with_capacity(m);
1095 for i in 0..m {
1096 case_parts.push(format!("WHEN ?{} THEN ?{}", i + 1, m + i + 1));
1097 id_parts.push(format!("?{}", i + 1));
1098 }
1099 let sql = format!(
1100 "UPDATE entity SET in_deg = MAX(0, in_deg - CASE id {} ELSE 0 END) WHERE id IN ({})",
1101 case_parts.join(" "),
1102 id_parts.join(","),
1103 );
1104 let mut params: Vec<Box<dyn ToSql>> = Vec::with_capacity(2 * m);
1105 for id in &in_keys {
1106 params.push(Box::new(*id));
1107 }
1108 for delta in &in_vals {
1109 params.push(Box::new(*delta));
1110 }
1111 let param_refs: Vec<&dyn ToSql> = params.iter().map(|p| p.as_ref()).collect();
1112 conn.execute(&sql, param_refs.as_slice()).map_err(sqlite_err)?;
1113 }
1114
1115 let type_keys: Vec<i64> = type_deltas.keys().cloned().collect();
1117 let type_vals: Vec<i64> = type_deltas.values().cloned().collect();
1118 if !type_keys.is_empty() {
1119 let m = type_keys.len();
1120 let mut case_parts: Vec<String> = Vec::with_capacity(m);
1121 let mut id_parts: Vec<String> = Vec::with_capacity(m);
1122 for i in 0..m {
1123 case_parts.push(format!("WHEN ?{} THEN ?{}", i + 1, m + i + 1));
1124 id_parts.push(format!("?{}", i + 1));
1125 }
1126 let sql = format!(
1127 "UPDATE type_dict SET count = MAX(0, count - CASE id {} ELSE 0 END) WHERE id IN ({})",
1128 case_parts.join(" "),
1129 id_parts.join(","),
1130 );
1131 let mut params: Vec<Box<dyn ToSql>> = Vec::with_capacity(2 * m);
1132 for id in &type_keys {
1133 params.push(Box::new(*id));
1134 }
1135 for delta in &type_vals {
1136 params.push(Box::new(*delta));
1137 }
1138 let param_refs: Vec<&dyn ToSql> = params.iter().map(|p| p.as_ref()).collect();
1139 conn.execute(&sql, param_refs.as_slice()).map_err(sqlite_err)?;
1140 }
1141
1142 inc_graph_stat(&conn, "relations", -(total as i64))?;
1143
1144 for (from, to) in &names {
1147 self.meta_update(from, |m| m.out_deg = m.out_deg.saturating_sub(1));
1148 self.meta_update(to, |m| m.in_deg = m.in_deg.saturating_sub(1));
1149 }
1150
1151 Ok(())
1152 }
1153
1154 pub fn add_observations(&self, entity_name: &str, contents: &[String]) -> Result<Vec<String>> {
1155 let conn = self.writer.lock();
1156 let (id, _type_id, _, _) = match self.get_entity_id(&conn, entity_name)? {
1157 Some(v) => v,
1158 None => {
1159 return Err(MCSError::InvalidParams(format!(
1160 "Entity '{entity_name}' not found"
1161 )))
1162 }
1163 };
1164
1165 let mut max_idx: i64 = conn
1166 .query_row(
1167 "SELECT COALESCE(MAX(idx), -1) FROM observation WHERE entity_id = ?1",
1168 params![id],
1169 |row| row.get(0),
1170 )
1171 .map_err(sqlite_err)?;
1172
1173 let ts = now_us();
1174 let mut ins_obs = conn
1175 .prepare_cached(
1176 "INSERT INTO observation (id, entity_id, idx, body, created_us) VALUES (?1, ?2, ?3, ?4, ?5)",
1177 )
1178 .map_err(sqlite_err)?;
1179
1180 for content in contents {
1181 max_idx += 1;
1182 let oid = self.next_obs_id();
1183 ins_obs
1184 .execute(params![oid, id, max_idx, content, ts])
1185 .map_err(sqlite_err)?;
1186 }
1187 let added = contents.to_vec();
1188
1189 let count: i64 = contents.len() as i64;
1190 conn.execute(
1191 "UPDATE entity SET obs_count = obs_count + ?1, updated_us = ?2 WHERE id = ?3",
1192 params![count, ts, id],
1193 )
1194 .map_err(sqlite_err)?;
1195
1196 inc_graph_stat(&conn, "observations", count)?;
1197 self.sync_seqs(&conn)?;
1198
1199 self.meta_update(entity_name, |m| m.obs_count += count);
1200
1201 Ok(added)
1202 }
1203
1204 pub fn delete_observations(&self, entity_name: &str, observations: &[String]) -> Result<()> {
1205 if observations.is_empty() {
1206 return Ok(());
1207 }
1208 let conn = self.writer.lock();
1209 let (id, _, _, _) = match self.get_entity_id(&conn, entity_name)? {
1210 Some(v) => v,
1211 None => {
1212 return Err(MCSError::InvalidParams(format!(
1213 "Entity '{entity_name}' not found"
1214 )))
1215 }
1216 };
1217
1218 let placeholders: Vec<String> = (0..observations.len())
1219 .map(|i| format!("?{}", i + 2))
1220 .collect();
1221 let sql = format!(
1222 "DELETE FROM observation WHERE entity_id = ?1 AND body IN ({})",
1223 placeholders.join(",")
1224 );
1225
1226 let mut param_values: Vec<Box<dyn ToSql>> = Vec::with_capacity(1 + observations.len());
1227 param_values.push(Box::new(id));
1228 for obs in observations {
1229 param_values.push(Box::new(obs.as_str()));
1230 }
1231 let param_refs: Vec<&dyn ToSql> = param_values.iter().map(|p| p.as_ref()).collect();
1232 let removed = conn.execute(&sql, param_refs.as_slice()).map_err(sqlite_err)? as i64;
1233
1234 if removed > 0 {
1235 conn.execute(
1236 "UPDATE entity SET obs_count = MAX(0, obs_count - ?1), updated_us = ?2 WHERE id = ?3",
1237 params![removed, now_us(), id],
1238 )
1239 .map_err(sqlite_err)?;
1240 inc_graph_stat(&conn, "observations", -removed)?;
1241
1242 self.meta_update(entity_name, |m| m.obs_count = m.obs_count.saturating_sub(removed));
1243 }
1244
1245 Ok(())
1246 }
1247
1248 pub fn upsert_entities(&self, entities: &[Entity]) -> Result<Vec<Entity>> {
1249 let mut results = Vec::new();
1250 for entity in entities {
1251 if let Some(existing) = self.get_entity(&entity.name)? {
1252 if existing.entity_type != entity.entity_type {
1254 let conn = self.writer.lock();
1255 let old_type_id = conn
1256 .query_row(
1257 "SELECT type_id FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
1258 params![name_hash(&entity.name), entity.name],
1259 |row| row.get::<_, i64>(0),
1260 )
1261 .map_err(sqlite_err)?;
1262 let new_type_id = get_type_id(&conn, &entity.entity_type, 0)?;
1263 inc_type_count(&conn, old_type_id, -1)?;
1264 inc_type_count(&conn, new_type_id, 1)?;
1265 conn.execute(
1266 "UPDATE entity SET type_id = ?1, updated_us = ?2 WHERE name_hash = ?3 AND name = ?4",
1267 params![new_type_id, now_us(), name_hash(&entity.name), entity.name],
1268 )
1269 .map_err(sqlite_err)?;
1270 self.meta_remove(&entity.name);
1272 }
1273 let existing_set: HashSet<&str> =
1275 existing.observations.iter().map(|s| s.as_str()).collect();
1276 let to_add: Vec<String> = entity
1277 .observations
1278 .iter()
1279 .filter(|o| !existing_set.contains(o.as_str()))
1280 .cloned()
1281 .collect();
1282 if !to_add.is_empty() {
1283 self.add_observations(&entity.name, &to_add)?;
1284 }
1285 let updated = self
1286 .get_entity(&entity.name)?
1287 .unwrap_or(entity.clone());
1288 results.push(updated);
1289 } else {
1290 let c = self.create_entities(std::slice::from_ref(entity))?;
1291 if let Some(e) = c.into_iter().next() {
1292 results.push(e);
1293 }
1294 }
1295 }
1296 Ok(results)
1297 }
1298
1299 pub fn merge_entities(&self, source: &str, target: &str) -> Result<Entity> {
1300 let conn = self.writer.lock();
1301 let (src_id, _, _, _) = match self.get_entity_id(&conn, source)? {
1302 Some(v) => v,
1303 None => {
1304 return Err(MCSError::InvalidParams(format!(
1305 "Source entity '{source}' not found"
1306 )))
1307 }
1308 };
1309 let (tgt_id, _, _, _) = match self.get_entity_id(&conn, target)? {
1310 Some(v) => v,
1311 None => {
1312 return Err(MCSError::InvalidParams(format!(
1313 "Target entity '{target}' not found"
1314 )))
1315 }
1316 };
1317
1318 if src_id == tgt_id {
1319 return self.get_entity(target)?.ok_or_else(|| {
1320 MCSError::InvalidParams("Target entity not found after merge".into())
1321 });
1322 }
1323
1324 let mut obs_count: i64 = 0;
1326 {
1327 let mut max_idx: i64 = conn
1328 .query_row(
1329 "SELECT COALESCE(MAX(idx), -1) FROM observation WHERE entity_id = ?1",
1330 params![tgt_id],
1331 |row| row.get(0),
1332 )
1333 .map_err(sqlite_err)?;
1334 let mut sel_obs = conn
1335 .prepare_cached(
1336 "SELECT id, body FROM observation WHERE entity_id = ?1 ORDER BY idx",
1337 )
1338 .map_err(sqlite_err)?;
1339 let mut upd_obs = conn
1340 .prepare_cached("UPDATE observation SET entity_id = ?1, idx = ?2 WHERE id = ?3")
1341 .map_err(sqlite_err)?;
1342 let rows: Vec<(i64, String)> = sel_obs
1343 .query_map(params![src_id], |row| {
1344 Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?))
1345 })
1346 .map_err(sqlite_err)?
1347 .filter_map(|r| r.ok())
1348 .collect();
1349 for (oid, _body) in &rows {
1350 max_idx += 1;
1351 upd_obs
1352 .execute(params![tgt_id, max_idx, oid])
1353 .map_err(sqlite_err)?;
1354 obs_count += 1;
1355 }
1356 }
1357
1358 conn.execute(
1360 "UPDATE OR IGNORE relation SET from_id = ?1 WHERE from_id = ?2",
1361 params![tgt_id, src_id],
1362 )
1363 .map_err(sqlite_err)?;
1364 conn.execute(
1365 "UPDATE OR IGNORE relation SET to_id = ?1 WHERE to_id = ?2",
1366 params![tgt_id, src_id],
1367 )
1368 .map_err(sqlite_err)?;
1369 conn.execute("DELETE FROM relation WHERE from_id = ?1", params![src_id])
1373 .map_err(sqlite_err)?;
1374 conn.execute("DELETE FROM relation WHERE to_id = ?1", params![src_id])
1375 .map_err(sqlite_err)?;
1376
1377 let out_add: i64 = conn
1379 .query_row(
1380 "SELECT COUNT(*) FROM relation WHERE from_id = ?1",
1381 params![tgt_id],
1382 |row| row.get(0),
1383 )
1384 .map_err(sqlite_err)?;
1385 let in_add: i64 = conn
1386 .query_row(
1387 "SELECT COUNT(*) FROM relation WHERE to_id = ?1",
1388 params![tgt_id],
1389 |row| row.get(0),
1390 )
1391 .map_err(sqlite_err)?;
1392 conn.execute(
1393 "UPDATE entity SET out_deg = ?1, in_deg = ?2, obs_count = obs_count + ?3, updated_us = ?4 WHERE id = ?5",
1394 params![out_add, in_add, obs_count, now_us(), tgt_id],
1395 )
1396 .map_err(sqlite_err)?;
1397
1398 conn.execute(
1400 "INSERT INTO name_fts(name_fts, rowid, name) VALUES ('delete', ?1, '')",
1401 params![src_id],
1402 )
1403 .map_err(sqlite_err)?;
1404 conn.execute("DELETE FROM entity WHERE id = ?1", params![src_id])
1405 .map_err(sqlite_err)?;
1406
1407 inc_graph_stat(&conn, "entities", -1)?;
1408 self.meta_remove(source);
1409
1410 if let Ok(meta) = conn.query_row(
1412 "SELECT id, type_id, obs_count, out_deg, in_deg FROM entity WHERE id = ?1",
1413 params![tgt_id],
1414 |row| {
1415 Ok(EntityMeta {
1416 id: row.get(0)?,
1417 type_id: row.get(1)?,
1418 obs_count: row.get(2)?,
1419 out_deg: row.get(3)?,
1420 in_deg: row.get(4)?,
1421 })
1422 },
1423 ) {
1424 self.meta_set(target, meta);
1425 }
1426
1427 let (name, etype): (String, String) = conn
1428 .query_row(
1429 "SELECT e.name, t.name FROM entity e JOIN type_dict t ON t.id = e.type_id WHERE e.id = ?1",
1430 params![tgt_id],
1431 |row| Ok((row.get(0)?, row.get(1)?)),
1432 )
1433 .map_err(sqlite_err)?;
1434 let observations = load_observations_opt(&conn, tgt_id);
1435
1436 Ok(Entity {
1437 name,
1438 entity_type: etype,
1439 observations,
1440 })
1441 }
1442
1443 pub fn search_nodes_filtered(
1444 &self,
1445 query: &str,
1446 filter_type: Option<&str>,
1447 offset: usize,
1448 limit: usize,
1449 ) -> Vec<Entity> {
1450 if query.is_empty() {
1451 return Vec::new();
1452 }
1453 let conn = self.readers.get();
1454
1455 let mut entity_ids: Vec<i64> = Vec::new();
1457 let mut seen: HashSet<i64> = HashSet::new();
1458
1459 if let Ok(mut stmt) = conn.prepare(
1460 "SELECT rowid FROM name_fts WHERE name_fts MATCH ?1 ORDER BY rank LIMIT ?2",
1461 ) {
1462 let limit_i64 = (limit + offset) as i64;
1463 if let Ok(rows) = stmt.query_map(params![query, limit_i64], |row| {
1464 row.get::<_, i64>(0)
1465 }) {
1466 for row in rows.flatten() {
1467 if seen.insert(row) {
1468 entity_ids.push(row);
1469 }
1470 }
1471 }
1472 }
1473
1474 if let Ok(mut stmt) = conn.prepare(
1475 "SELECT entity_id FROM obs_fts JOIN observation ON obs_fts.rowid = observation.id
1476 WHERE obs_fts MATCH ?1
1477 GROUP BY entity_id
1478 LIMIT ?2",
1479 ) {
1480 let limit_i64 = (limit + offset) as i64;
1481 if let Ok(rows) = stmt.query_map(params![query, limit_i64], |row| {
1482 row.get::<_, i64>(0)
1483 }) {
1484 for row in rows.flatten() {
1485 if seen.insert(row) {
1486 entity_ids.push(row);
1487 }
1488 }
1489 }
1490 }
1491
1492 let mut results = Vec::new();
1494 let mut count: usize = 0;
1495 for eid in entity_ids {
1496 if let Ok(entity) = entity_by_id(&conn, eid) {
1497 if let Some(ft) = filter_type
1498 && !ft.is_empty() && entity.entity_type != ft {
1499 continue;
1500 }
1501 if count < offset {
1502 count += 1;
1503 continue;
1504 }
1505 if results.len() >= limit {
1506 break;
1507 }
1508 results.push(entity);
1509 count += 1;
1510 }
1511 }
1512
1513 results
1514 }
1515
1516 pub fn read_graph_filtered(
1517 &self,
1518 filter_type: Option<&str>,
1519 offset: usize,
1520 limit: usize,
1521 ) -> Result<String> {
1522 let conn = self.readers.get();
1523
1524 let limit_sql: i64 = if limit == usize::MAX {
1525 -1
1526 } else {
1527 limit.min(i64::MAX as usize) as i64
1528 };
1529 let offset_sql: i64 = offset as i64;
1530
1531 let filter = filter_type.filter(|ft| !ft.is_empty());
1537 let ids: Vec<i64> = if let Some(ft) = filter {
1538 let mut stmt = conn
1539 .prepare_cached(
1540 "SELECT e.id FROM entity e
1541 WHERE e.type_id = (SELECT id FROM type_dict WHERE kind = 0 AND name = ?1)
1542 AND e.flags = 0
1543 ORDER BY e.id LIMIT ?2 OFFSET ?3",
1544 )
1545 .map_err(sqlite_err)?;
1546 stmt.query_map(params![ft, limit_sql, offset_sql], |r| r.get::<_, i64>(0))
1547 .map_err(sqlite_err)?
1548 .filter_map(|r| r.ok())
1549 .collect()
1550 } else {
1551 let mut stmt = conn
1552 .prepare_cached(
1553 "SELECT e.id FROM entity e WHERE e.flags = 0
1554 ORDER BY e.id LIMIT ?1 OFFSET ?2",
1555 )
1556 .map_err(sqlite_err)?;
1557 stmt.query_map(params![limit_sql, offset_sql], |r| r.get::<_, i64>(0))
1558 .map_err(sqlite_err)?
1559 .filter_map(|r| r.ok())
1560 .collect()
1561 };
1562
1563 if ids.is_empty() {
1564 return Ok(r#"{"entities":[],"relations":[]}"#.to_string());
1565 }
1566
1567 let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1568
1569 let entities_json: String = {
1570 let sql = format!(
1571 "SELECT COALESCE(json_group_array(json_object(
1572 'name', e.name,
1573 'entityType', t.name,
1574 'observations', COALESCE((
1575 SELECT json_group_array(o.body ORDER BY o.idx)
1576 FROM observation o WHERE o.entity_id = e.id
1577 ), json('[]'))
1578 ) ORDER BY e.id), json('[]'))
1579 FROM entity e
1580 JOIN type_dict t ON t.id = e.type_id
1581 WHERE e.id IN ({placeholders}) AND e.flags = 0"
1582 );
1583 conn.query_row(&sql, rusqlite::params_from_iter(&ids), |row| {
1584 row.get::<_, String>(0)
1585 })
1586 .map_err(sqlite_err)?
1587 };
1588
1589 let relations_json: String = {
1590 let sql = format!(
1591 "SELECT COALESCE(json_group_array(json_object(
1592 'from', e1.name,
1593 'to', e2.name,
1594 'relationType', t.name
1595 )), json('[]'))
1596 FROM relation r
1597 JOIN entity e1 ON e1.id = r.from_id
1598 JOIN entity e2 ON e2.id = r.to_id
1599 JOIN type_dict t ON t.id = r.type_id
1600 WHERE r.from_id IN ({placeholders}) AND r.to_id IN ({placeholders})
1601 AND e1.flags = 0 AND e2.flags = 0"
1602 );
1603 let all_params: Vec<&dyn ToSql> = ids
1604 .iter()
1605 .map(|id| id as &dyn ToSql)
1606 .chain(ids.iter().map(|id| id as &dyn ToSql))
1607 .collect();
1608 conn.query_row(&sql, all_params.as_slice(), |row| row.get::<_, String>(0))
1609 .map_err(sqlite_err)?
1610 };
1611
1612 let mut out = String::with_capacity(32 + entities_json.len() + relations_json.len());
1613 out.push_str("{\"entities\":");
1614 out.push_str(&entities_json);
1615 out.push_str(",\"relations\":");
1616 out.push_str(&relations_json);
1617 out.push('}');
1618 Ok(out)
1619 }
1620
1621 pub fn open_nodes(&self, names: &[String]) -> String {
1622 let conn = self.readers.get();
1623 let mut entity_ids: Vec<i64> = Vec::new();
1624
1625 for name in names {
1626 let h = name_hash(name);
1627 if let Ok(Some(id)) = conn
1628 .query_row(
1629 "SELECT id FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
1630 params![h, name],
1631 |row| row.get::<_, i64>(0),
1632 )
1633 .map(Some)
1634 .or_else(|e| if is_not_found(&e) { Ok(None) } else { Err(sqlite_err(e)) })
1635 {
1636 entity_ids.push(id);
1637 }
1638 }
1639
1640 if entity_ids.is_empty() {
1641 return r#"{"entities":[],"relations":[]}"#.to_string();
1642 }
1643
1644 let placeholders: Vec<String> = entity_ids.iter().map(|_| "?".to_string()).collect();
1645 let ids_str = placeholders.join(",");
1646
1647 let entities_json: String = {
1648 let sql = format!(
1649 "SELECT COALESCE(json_group_array(json_object(
1650 'name', e.name,
1651 'entityType', t.name,
1652 'observations', COALESCE((
1653 SELECT json_group_array(o.body ORDER BY o.idx)
1654 FROM observation o WHERE o.entity_id = e.id
1655 ), json('[]'))
1656 ) ORDER BY e.id), json('[]'))
1657 FROM entity e
1658 JOIN type_dict t ON t.id = e.type_id
1659 WHERE e.id IN ({ids_str}) AND e.flags = 0"
1660 );
1661 conn.query_row(&sql, rusqlite::params_from_iter(&entity_ids), |row| {
1662 row.get::<_, String>(0)
1663 })
1664 .unwrap_or_else(|_| "[]".to_string())
1665 };
1666
1667 let relations_json: String = {
1668 let sql = format!(
1669 "SELECT COALESCE(json_group_array(json_object(
1670 'from', e1.name,
1671 'to', e2.name,
1672 'relationType', t.name
1673 )), json('[]'))
1674 FROM relation r
1675 JOIN entity e1 ON e1.id = r.from_id
1676 JOIN entity e2 ON e2.id = r.to_id
1677 JOIN type_dict t ON t.id = r.type_id
1678 WHERE (r.from_id IN ({ids_str}) OR r.to_id IN ({ids_str}))
1679 AND e1.flags = 0 AND e2.flags = 0"
1680 );
1681 let all_params: Vec<&dyn rusqlite::types::ToSql> = entity_ids
1682 .iter()
1683 .map(|id| id as &dyn rusqlite::types::ToSql)
1684 .chain(entity_ids.iter().map(|id| id as &dyn rusqlite::types::ToSql))
1685 .collect();
1686 let mut stmt = conn.prepare(&sql).unwrap();
1687 stmt.query_row(all_params.as_slice(), |row| row.get::<_, String>(0))
1688 .unwrap_or_else(|_| "[]".to_string())
1689 };
1690
1691 let mut out = String::with_capacity(32 + entities_json.len() + relations_json.len());
1692 out.push_str("{\"entities\":");
1693 out.push_str(&entities_json);
1694 out.push_str(",\"relations\":");
1695 out.push_str(&relations_json);
1696 out.push('}');
1697 out
1698 }
1699
1700 pub fn entities_exist(&self, names: &[String]) -> Result<Vec<bool>> {
1701 let conn = self.readers.get();
1702 let mut results = Vec::with_capacity(names.len());
1703 for name in names {
1704 let h = name_hash(name);
1705 let exists: bool = conn
1706 .query_row(
1707 "SELECT 1 FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
1708 params![h, name],
1709 |_| Ok(()),
1710 )
1711 .is_ok();
1712 results.push(exists);
1713 }
1714 Ok(results)
1715 }
1716
1717 pub fn degree(&self, name: &str, direction: Direction) -> Result<usize> {
1718 let conn = self.readers.get();
1719 let (_, _, out_d, in_d) = match self.get_entity_id(&conn, name)? {
1720 Some(v) => v,
1721 None => {
1722 return Err(MCSError::InvalidParams(format!(
1723 "Entity '{name}' not found"
1724 )))
1725 }
1726 };
1727 Ok(match direction {
1728 Direction::Outgoing => out_d as usize,
1729 Direction::Incoming => in_d as usize,
1730 Direction::Both => (out_d + in_d) as usize,
1731 })
1732 }
1733
1734 pub fn get_entity_count(&self) -> Result<usize> {
1735 let conn = self.readers.get();
1736 read_graph_stat(&conn, "entities")
1737 .map(|v| v as usize)
1738 .map_err(|_| MCSError::MemoryError("Failed to read entity count".into()))
1739 }
1740
1741 pub fn get_relation_count(&self) -> Result<usize> {
1742 let conn = self.readers.get();
1743 read_graph_stat(&conn, "relations")
1744 .map(|v| v as usize)
1745 .map_err(|_| MCSError::MemoryError("Failed to read relation count".into()))
1746 }
1747
1748 pub fn search_relations(
1749 &self,
1750 from: Option<&str>,
1751 to: Option<&str>,
1752 rtype: Option<&str>,
1753 limit: Option<usize>,
1754 ) -> Vec<Relation> {
1755 let conn = self.readers.get();
1756 let mut results = Vec::new();
1757
1758 let from_id = from
1764 .filter(|f| !f.is_empty())
1765 .map(|f| entity_name_lookup(&conn, f).ok().flatten().unwrap_or(-1));
1766 let to_id = to
1767 .filter(|t| !t.is_empty())
1768 .map(|t| entity_name_lookup(&conn, t).ok().flatten().unwrap_or(-1));
1769 let type_id = rtype
1770 .filter(|rt| !rt.is_empty())
1771 .map(|rt| lookup_type_id(&conn, rt, 1).unwrap_or(-1));
1772
1773 match (from_id, to_id, type_id) {
1774 (Some(fid), Some(tid), Some(tpid)) => {
1775 if let Ok(mut stmt) = conn.prepare_cached(
1776 "SELECT e1.name, e2.name, t.name
1777 FROM relation r
1778 JOIN entity e1 ON e1.id = r.from_id
1779 JOIN entity e2 ON e2.id = r.to_id
1780 JOIN type_dict t ON t.id = r.type_id
1781 WHERE r.from_id = ?1 AND r.to_id = ?2 AND r.type_id = ?3
1782 AND e1.flags = 0 AND e2.flags = 0
1783 ORDER BY r.from_id, r.to_id"
1784 )
1785 && let Ok(rows) = stmt.query_map(params![fid, tid, tpid], |row| {
1786 Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1787 }) {
1788 for row in rows.flatten() { results.push(row); }
1789 }
1790 }
1791 (Some(fid), Some(tid), None) => {
1792 if let Ok(mut stmt) = conn.prepare_cached(
1793 "SELECT e1.name, e2.name, t.name
1794 FROM relation r
1795 JOIN entity e1 ON e1.id = r.from_id
1796 JOIN entity e2 ON e2.id = r.to_id
1797 JOIN type_dict t ON t.id = r.type_id
1798 WHERE r.from_id = ?1 AND r.to_id = ?2
1799 AND e1.flags = 0 AND e2.flags = 0
1800 ORDER BY r.from_id, r.to_id"
1801 )
1802 && let Ok(rows) = stmt.query_map(params![fid, tid], |row| {
1803 Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1804 }) {
1805 for row in rows.flatten() { results.push(row); }
1806 }
1807 }
1808 (Some(fid), None, Some(tpid)) => {
1809 if let Ok(mut stmt) = conn.prepare_cached(
1810 "SELECT e1.name, e2.name, t.name
1811 FROM relation r
1812 JOIN entity e1 ON e1.id = r.from_id
1813 JOIN entity e2 ON e2.id = r.to_id
1814 JOIN type_dict t ON t.id = r.type_id
1815 WHERE r.from_id = ?1 AND r.type_id = ?2
1816 AND e1.flags = 0 AND e2.flags = 0
1817 ORDER BY r.from_id, r.to_id"
1818 )
1819 && let Ok(rows) = stmt.query_map(params![fid, tpid], |row| {
1820 Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1821 }) {
1822 for row in rows.flatten() { results.push(row); }
1823 }
1824 }
1825 (None, Some(tid), Some(tpid)) => {
1826 if let Ok(mut stmt) = conn.prepare_cached(
1827 "SELECT e1.name, e2.name, t.name
1828 FROM relation r
1829 JOIN entity e1 ON e1.id = r.from_id
1830 JOIN entity e2 ON e2.id = r.to_id
1831 JOIN type_dict t ON t.id = r.type_id
1832 WHERE r.to_id = ?1 AND r.type_id = ?2
1833 AND e1.flags = 0 AND e2.flags = 0
1834 ORDER BY r.from_id, r.to_id"
1835 )
1836 && let Ok(rows) = stmt.query_map(params![tid, tpid], |row| {
1837 Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1838 }) {
1839 for row in rows.flatten() { results.push(row); }
1840 }
1841 }
1842 (Some(fid), None, None) => {
1843 if let Ok(mut stmt) = conn.prepare_cached(
1844 "SELECT e1.name, e2.name, t.name
1845 FROM relation r
1846 JOIN entity e1 ON e1.id = r.from_id
1847 JOIN entity e2 ON e2.id = r.to_id
1848 JOIN type_dict t ON t.id = r.type_id
1849 WHERE r.from_id = ?1
1850 AND e1.flags = 0 AND e2.flags = 0
1851 ORDER BY r.from_id, r.to_id"
1852 )
1853 && let Ok(rows) = stmt.query_map(params![fid], |row| {
1854 Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1855 }) {
1856 for row in rows.flatten() { results.push(row); }
1857 }
1858 }
1859 (None, Some(tid), None) => {
1860 if let Ok(mut stmt) = conn.prepare_cached(
1861 "SELECT e1.name, e2.name, t.name
1862 FROM relation r
1863 JOIN entity e1 ON e1.id = r.from_id
1864 JOIN entity e2 ON e2.id = r.to_id
1865 JOIN type_dict t ON t.id = r.type_id
1866 WHERE r.to_id = ?1
1867 AND e1.flags = 0 AND e2.flags = 0
1868 ORDER BY r.from_id, r.to_id"
1869 )
1870 && let Ok(rows) = stmt.query_map(params![tid], |row| {
1871 Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1872 }) {
1873 for row in rows.flatten() { results.push(row); }
1874 }
1875 }
1876 (None, None, Some(tpid)) => {
1877 if let Ok(mut stmt) = conn.prepare_cached(
1878 "SELECT e1.name, e2.name, t.name
1879 FROM relation r
1880 JOIN entity e1 ON e1.id = r.from_id
1881 JOIN entity e2 ON e2.id = r.to_id
1882 JOIN type_dict t ON t.id = r.type_id
1883 WHERE r.type_id = ?1
1884 AND e1.flags = 0 AND e2.flags = 0
1885 ORDER BY r.from_id, r.to_id"
1886 )
1887 && let Ok(rows) = stmt.query_map(params![tpid], |row| {
1888 Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1889 }) {
1890 for row in rows.flatten() { results.push(row); }
1891 }
1892 }
1893 (None, None, None) => {
1894 if let Ok(mut stmt) = conn.prepare_cached(
1895 "SELECT e1.name, e2.name, t.name
1896 FROM relation r
1897 JOIN entity e1 ON e1.id = r.from_id
1898 JOIN entity e2 ON e2.id = r.to_id
1899 JOIN type_dict t ON t.id = r.type_id
1900 WHERE e1.flags = 0 AND e2.flags = 0
1901 ORDER BY r.from_id, r.to_id"
1902 )
1903 && let Ok(rows) = stmt.query_map([], |row| {
1904 Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1905 }) {
1906 for row in rows.flatten() { results.push(row); }
1907 }
1908 }
1909 }
1910 if let Some(lim) = limit {
1911 results.truncate(lim);
1912 }
1913 results
1914 }
1915
1916 pub fn find_path(&self, from: &str, to: &str) -> Result<Option<Vec<String>>> {
1917 let conn = self.readers.get();
1918 let (from_id, _, _, _) = match self.get_entity_id(&conn, from)? {
1919 Some(v) => v,
1920 None => {
1921 return Err(MCSError::InvalidParams(format!(
1922 "Source entity '{from}' not found"
1923 )))
1924 }
1925 };
1926 let (to_id, _, _, _) = match self.get_entity_id(&conn, to)? {
1927 Some(v) => v,
1928 None => {
1929 return Err(MCSError::InvalidParams(format!(
1930 "Target entity '{to}' not found"
1931 )))
1932 }
1933 };
1934
1935 if from_id == to_id {
1936 return Ok(Some(vec![from.to_string()]));
1937 }
1938
1939 let mut visited = HashSet::new();
1941 let mut parent: FxHashMap<i64, i64> = FxHashMap::default();
1942 let mut queue = VecDeque::new();
1943 visited.insert(from_id);
1944 queue.push_back(from_id);
1945
1946 while let Some(cur) = queue.pop_front() {
1947 if cur == to_id {
1948 break;
1949 }
1950 if let Ok(mut stmt) =
1952 conn.prepare_cached("SELECT to_id FROM relation WHERE from_id = ?1")
1953 && let Ok(rows) = stmt.query_map(params![cur], |row| row.get::<_, i64>(0)) {
1954 for row in rows.flatten() {
1955 if visited.insert(row) {
1956 parent.insert(row, cur);
1957 queue.push_back(row);
1958 }
1959 }
1960 }
1961 if let Ok(mut stmt) =
1963 conn.prepare_cached("SELECT from_id FROM relation WHERE to_id = ?1")
1964 && let Ok(rows) = stmt.query_map(params![cur], |row| row.get::<_, i64>(0)) {
1965 for row in rows.flatten() {
1966 if visited.insert(row) {
1967 parent.insert(row, cur);
1968 queue.push_back(row);
1969 }
1970 }
1971 }
1972 }
1973
1974 if !parent.contains_key(&to_id) && to_id != from_id {
1975 return Ok(None);
1976 }
1977
1978 let mut path = Vec::new();
1979 let mut cur = to_id;
1980 path.push(cur);
1981 while let Some(&p) = parent.get(&cur) {
1982 path.push(p);
1983 cur = p;
1984 if cur == from_id {
1985 break;
1986 }
1987 }
1988 path.reverse();
1989
1990 let placeholders: Vec<String> = path.iter().map(|_| "?".to_string()).collect();
1991 let sql = format!(
1992 "SELECT id, name FROM entity WHERE id IN ({})",
1993 placeholders.join(",")
1994 );
1995 let name_map: FxHashMap<i64, String> = if let Ok(mut stmt) = conn.prepare(&sql)
1996 && let Ok(rows) = stmt.query_map(
1997 rusqlite::params_from_iter(&path),
1998 |row| Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?)),
1999 ) {
2000 rows.flatten().collect()
2001 } else {
2002 FxHashMap::default()
2003 };
2004
2005 let name_path: Vec<String> = path.iter().filter_map(|id| name_map.get(id).cloned()).collect();
2006
2007 Ok(Some(name_path))
2008 }
2009
2010 pub fn compact(&self) -> Result<()> {
2011 let conn = self.writer.lock();
2012 conn.execute_batch("PRAGMA incremental_vacuum;").map_err(sqlite_err)?;
2013 Ok(())
2014 }
2015
2016 pub fn neighbors(
2017 &self,
2018 name: &str,
2019 direction: Direction,
2020 rtype: Option<&str>,
2021 depth: u32,
2022 ) -> Result<String> {
2023 self._traverse(name, direction, rtype, depth, true)
2024 }
2025
2026 pub fn extract_subgraph(
2027 &self,
2028 names: &[String],
2029 depth: u32,
2030 ) -> Result<String> {
2031 if names.is_empty() {
2032 return Ok(r#"{"entities":[],"relations":[]}"#.to_string());
2033 }
2034
2035 let conn = self.readers.get();
2036 let mut all_entity_ids: HashSet<i64> = HashSet::new();
2037 let mut frontier: HashSet<i64> = HashSet::new();
2038 let mut all_rel_pairs: HashSet<(i64, i64, i64)> = HashSet::new();
2039
2040 for name in names {
2042 let h = name_hash(name);
2043 if let Ok(Some(id)) = conn
2044 .query_row(
2045 "SELECT id FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
2046 params![h, name],
2047 |row| row.get::<_, i64>(0),
2048 )
2049 .map(Some)
2050 .or_else(|e| if is_not_found(&e) { Ok(None) } else { Err(sqlite_err(e)) })
2051 {
2052 all_entity_ids.insert(id);
2053 frontier.insert(id);
2054 }
2055 }
2056
2057 let mut current_depth = 0u32;
2058 while current_depth < depth && !frontier.is_empty() {
2059 let mut next_frontier: HashSet<i64> = HashSet::new();
2060
2061 const CHUNK: usize = 500;
2064 let frontier_ids: Vec<i64> = frontier.iter().copied().collect();
2065 for chunk in frontier_ids.chunks(CHUNK) {
2066 let placeholders: Vec<String> = chunk.iter().map(|_| "?".to_string()).collect();
2067 let in_clause = placeholders.join(",");
2068
2069 if let Ok(mut stmt) = conn.prepare(
2071 &format!(
2072 "SELECT from_id, to_id, type_id FROM relation WHERE from_id IN ({in_clause})",
2073 )
2074 )
2075 && let Ok(rows) = stmt.query_map(
2076 rusqlite::params_from_iter(chunk),
2077 |row| Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?, row.get::<_, i64>(2)?)),
2078 )
2079 {
2080 for row in rows.flatten() {
2081 let (from_id, to_id, type_id) = row;
2082 all_rel_pairs.insert((from_id, to_id, type_id));
2083 if all_entity_ids.insert(to_id) {
2084 next_frontier.insert(to_id);
2085 }
2086 }
2087 }
2088
2089 if let Ok(mut stmt) = conn.prepare(
2091 &format!(
2092 "SELECT from_id, to_id, type_id FROM relation WHERE to_id IN ({in_clause})",
2093 )
2094 )
2095 && let Ok(rows) = stmt.query_map(
2096 rusqlite::params_from_iter(chunk),
2097 |row| Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?, row.get::<_, i64>(2)?)),
2098 )
2099 {
2100 for row in rows.flatten() {
2101 let (from_id, to_id, type_id) = row;
2102 all_rel_pairs.insert((from_id, to_id, type_id));
2103 if all_entity_ids.insert(from_id) {
2104 next_frontier.insert(from_id);
2105 }
2106 }
2107 }
2108 }
2109 if all_entity_ids.len() > MAX_TRAVERSAL_ENTITIES
2110 || all_rel_pairs.len() > MAX_TRAVERSAL_RELS
2111 {
2112 break;
2113 }
2114 frontier = next_frontier;
2115 current_depth += 1;
2116 }
2117
2118 let entities_json: String = {
2119 if all_entity_ids.is_empty() {
2120 "[]".to_string()
2121 } else {
2122 let ids: Vec<i64> = all_entity_ids.iter().copied().collect();
2123 let placeholders: Vec<String> = ids.iter().map(|_| "?".to_string()).collect();
2124 let sql = format!(
2125 "SELECT COALESCE(json_group_array(json_object(
2126 'name', e.name,
2127 'entityType', t.name,
2128 'observations', COALESCE((
2129 SELECT json_group_array(o.body ORDER BY o.idx)
2130 FROM observation o WHERE o.entity_id = e.id
2131 ), json('[]'))
2132 ) ORDER BY e.id), json('[]'))
2133 FROM entity e
2134 JOIN type_dict t ON t.id = e.type_id
2135 WHERE e.id IN ({}) AND e.flags = 0",
2136 placeholders.join(",")
2137 );
2138 conn.query_row(&sql, rusqlite::params_from_iter(&ids), |row| {
2139 row.get::<_, String>(0)
2140 })
2141 .unwrap_or_else(|_| "[]".to_string())
2142 }
2143 };
2144
2145 let relations_json: String = {
2146 if all_rel_pairs.is_empty() {
2147 "[]".to_string()
2148 } else {
2149 let vals: Vec<String> = all_rel_pairs.iter().map(|_| "(?, ?, ?)".to_string()).collect();
2150 let sql = format!(
2151 "WITH r(from_id, to_id, type_id) AS (VALUES {})
2152 SELECT COALESCE(json_group_array(json_object(
2153 'from', e1.name,
2154 'to', e2.name,
2155 'relationType', t.name
2156 )), json('[]'))
2157 FROM r
2158 JOIN entity e1 ON e1.id = r.from_id
2159 JOIN entity e2 ON e2.id = r.to_id
2160 JOIN type_dict t ON t.id = r.type_id
2161 WHERE e1.flags = 0 AND e2.flags = 0",
2162 vals.join(", ")
2163 );
2164 let params: Vec<&dyn ToSql> = all_rel_pairs.iter()
2165 .flat_map(|(f, t, tp)| {
2166 vec![f as &dyn ToSql, t as &dyn ToSql, tp as &dyn ToSql]
2167 })
2168 .collect();
2169 let mut stmt = conn.prepare(&sql).map_err(sqlite_err)?;
2170 stmt.query_row(params.as_slice(), |row| row.get::<_, String>(0))
2171 .unwrap_or_else(|_| "[]".to_string())
2172 }
2173 };
2174
2175 let mut out = String::with_capacity(32 + entities_json.len() + relations_json.len());
2176 out.push_str("{\"entities\":");
2177 out.push_str(&entities_json);
2178 out.push_str(",\"relations\":");
2179 out.push_str(&relations_json);
2180 out.push('}');
2181 Ok(out)
2182 }
2183
2184 pub fn describe_entity(&self, name: &str) -> Result<Entity> {
2185 self.get_entity(name)?
2186 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))
2187 }
2188
2189 pub fn entity_type_counts(&self) -> Vec<(String, usize)> {
2190 let conn = self.readers.get();
2191 select_all_types(&conn, 0).unwrap_or_default()
2192 }
2193
2194 pub fn relation_type_counts(&self) -> Vec<(String, usize)> {
2195 let conn = self.readers.get();
2196 select_all_types(&conn, 1).unwrap_or_default()
2197 }
2198
2199 pub fn batch_get_entities(&self, names: &[String]) -> Vec<Option<Entity>> {
2200 names
2201 .iter()
2202 .map(|n| self.get_entity(n).unwrap_or(None))
2203 .collect()
2204 }
2205
2206 pub fn find_all_paths(
2207 &self,
2208 from: &str,
2209 to: &str,
2210 max_depth: usize,
2211 max_paths: usize,
2212 ) -> Result<Vec<Vec<String>>> {
2213 let conn = self.readers.get();
2214 let (from_id, _, _, _) = match self.get_entity_id(&conn, from)? {
2215 Some(v) => v,
2216 None => {
2217 return Err(MCSError::InvalidParams(format!(
2218 "Source entity '{from}' not found"
2219 )))
2220 }
2221 };
2222 let (to_id, _, _, _) = match self.get_entity_id(&conn, to)? {
2223 Some(v) => v,
2224 None => {
2225 return Err(MCSError::InvalidParams(format!(
2226 "Target entity '{to}' not found"
2227 )))
2228 }
2229 };
2230
2231 if from_id == to_id {
2232 return Ok(vec![vec![from.to_string()]]);
2233 }
2234
2235 let mut all_paths: Vec<Vec<i64>> = Vec::new();
2237 let mut queue: VecDeque<(i64, Vec<i64>)> = VecDeque::new();
2238 queue.push_back((from_id, vec![from_id]));
2239
2240 const MAX_QUEUE_SIZE: usize = 10_000_000;
2241
2242 while let Some((cur, path)) = queue.pop_front() {
2243 if all_paths.len() >= max_paths {
2244 break;
2245 }
2246 if path.len() > max_depth {
2247 continue;
2248 }
2249
2250 if let Ok(mut stmt) =
2252 conn.prepare_cached("SELECT to_id FROM relation WHERE from_id = ?1")
2253 && let Ok(rows) = stmt.query_map(params![cur], |row| row.get::<_, i64>(0)) {
2254 for next_id in rows.flatten() {
2255 if next_id == to_id {
2256 let mut full_path = path.clone();
2257 full_path.push(next_id);
2258 all_paths.push(full_path);
2259 if all_paths.len() >= max_paths {
2260 break;
2261 }
2262 } else if !path.contains(&next_id) && path.len() < max_depth {
2263 if queue.len() >= MAX_QUEUE_SIZE {
2264 return Err(MCSError::InvalidParams(
2265 "Path exploration queue exceeded limit (too many paths on highly connected graph)".to_string()
2266 ));
2267 }
2268 let mut new_path = path.clone();
2269 new_path.push(next_id);
2270 queue.push_back((next_id, new_path));
2271 }
2272 }
2273 }
2274
2275 if let Ok(mut stmt) =
2277 conn.prepare_cached("SELECT from_id FROM relation WHERE to_id = ?1")
2278 && let Ok(rows) = stmt.query_map(params![cur], |row| row.get::<_, i64>(0)) {
2279 for next_id in rows.flatten() {
2280 if next_id == to_id {
2281 let mut full_path = path.clone();
2282 full_path.push(next_id);
2283 all_paths.push(full_path);
2284 if all_paths.len() >= max_paths {
2285 break;
2286 }
2287 } else if !path.contains(&next_id) && path.len() < max_depth {
2288 if queue.len() >= MAX_QUEUE_SIZE {
2289 return Err(MCSError::InvalidParams(
2290 "Path exploration queue exceeded limit (too many paths on highly connected graph)".to_string()
2291 ));
2292 }
2293 let mut new_path = path.clone();
2294 new_path.push(next_id);
2295 queue.push_back((next_id, new_path));
2296 }
2297 }
2298 }
2299 }
2300
2301 let all_ids: HashSet<i64> = all_paths.iter().flat_map(|p| p.iter()).copied().collect();
2303 let id_list: Vec<i64> = all_ids.into_iter().collect();
2304 let name_map: FxHashMap<i64, String> = if id_list.is_empty() {
2305 FxHashMap::default()
2306 } else {
2307 let placeholders: Vec<String> = id_list.iter().map(|_| "?".to_string()).collect();
2308 let sql = format!(
2309 "SELECT id, name FROM entity WHERE id IN ({})",
2310 placeholders.join(",")
2311 );
2312 if let Ok(mut stmt) = conn.prepare(&sql)
2313 && let Ok(rows) = stmt.query_map(
2314 rusqlite::params_from_iter(&id_list),
2315 |row| Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?)),
2316 ) {
2317 rows.flatten().collect()
2318 } else {
2319 FxHashMap::default()
2320 }
2321 };
2322
2323 let mut named_paths: Vec<Vec<String>> = Vec::with_capacity(all_paths.len());
2324 for path_ids in all_paths {
2325 let named: Vec<String> = path_ids.iter().filter_map(|id| name_map.get(id).cloned()).collect();
2326 named_paths.push(named);
2327 }
2328
2329 Ok(named_paths)
2330 }
2331
2332 pub fn export(&self, _format: &str, max_rows: i64) -> Result<String> {
2337 let conn = self.readers.get();
2338 conn.query_row(
2341 "SELECT json_object(
2342 'entities', COALESCE((
2343 SELECT json_group_array(json_object(
2344 'name', e.name,
2345 'entityType', t.name,
2346 'observations', COALESCE((
2347 SELECT json_group_array(o.body ORDER BY o.idx)
2348 FROM observation o WHERE o.entity_id = e.id
2349 ), json('[]'))
2350 ) ORDER BY e.id)
2351 FROM (
2352 SELECT id, name, type_id FROM entity
2353 WHERE flags = 0 ORDER BY id LIMIT ?1
2354 ) e
2355 JOIN type_dict t ON t.id = e.type_id
2356 ), json('[]')),
2357 'relations', COALESCE((
2358 SELECT json_group_array(json_object(
2359 'from', e1.name,
2360 'to', e2.name,
2361 'relationType', t.name
2362 ))
2363 FROM (
2364 SELECT from_id, to_id, type_id FROM relation LIMIT ?1
2365 ) r
2366 JOIN entity e1 ON e1.id = r.from_id
2367 JOIN entity e2 ON e2.id = r.to_id
2368 JOIN type_dict t ON t.id = r.type_id
2369 WHERE e1.flags = 0 AND e2.flags = 0
2370 ), json('[]'))
2371 )",
2372 params![max_rows],
2373 |row| row.get::<_, String>(0),
2374 )
2375 .map_err(sqlite_err)
2376 }
2377
2378 pub fn wipe(&self) -> Result<()> {
2379 let conn = self.writer.lock();
2380 conn.execute_batch(
2385 "DELETE FROM observation;
2386 DELETE FROM relation;
2387 DELETE FROM entity;
2388 DELETE FROM type_dict;
2389 INSERT INTO name_fts(name_fts) VALUES('delete-all');
2390 INSERT INTO obs_fts(obs_fts) VALUES('delete-all');
2391 UPDATE graph_stat SET value = 0 WHERE key IN ('entities', 'relations', 'observations');
2392 UPDATE graph_stat SET value = 0 WHERE key IN ('entity_seq', 'obs_seq');",
2393 )
2394 .map_err(sqlite_err)?;
2395 self.seq_entity.store(0, Ordering::Relaxed);
2396 self.seq_obs.store(0, Ordering::Relaxed);
2397 self.cache.lock().clear();
2398 Ok(())
2399 }
2400
2401 pub fn run_maintenance(&self) -> Result<()> {
2404 let conn = self.writer.lock();
2405
2406 conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);")
2407 .map_err(sqlite_err)?;
2408
2409 conn.execute_batch("PRAGMA optimize(0x10000);")
2410 .map_err(sqlite_err)?;
2411
2412 conn.execute_batch(
2413 "INSERT INTO name_fts(name_fts) VALUES('optimize');
2414 INSERT INTO obs_fts(obs_fts) VALUES('optimize');",
2415 )
2416 .map_err(sqlite_err)?;
2417
2418 Ok(())
2419 }
2420
2421 pub fn checkpoint_passive(&self) -> Result<()> {
2425 let conn = self.writer.lock();
2426 conn.execute_batch("PRAGMA wal_checkpoint(PASSIVE);")
2427 .map_err(sqlite_err)?;
2428 Ok(())
2429 }
2430
2431 fn _traverse(
2432 &self,
2433 name: &str,
2434 direction: Direction,
2435 rtype: Option<&str>,
2436 depth: u32,
2437 _include_relations: bool,
2439 ) -> Result<String> {
2440 let conn = self.readers.get();
2441 let (start_id, _, _, _) = match self.get_entity_id(&conn, name)? {
2442 Some(v) => v,
2443 None => {
2444 return Err(MCSError::InvalidParams(format!(
2445 "Entity '{name}' not found"
2446 )))
2447 }
2448 };
2449
2450 let mut all_ids: HashSet<i64> = HashSet::new();
2451 let mut all_rels: HashSet<(i64, i64, i64)> = HashSet::new();
2452 let mut frontier: HashSet<i64> = HashSet::new();
2453 all_ids.insert(start_id);
2454 frontier.insert(start_id);
2455
2456 let type_filter: Option<i64> = rtype
2462 .filter(|rt| !rt.is_empty())
2463 .map(|rt| lookup_type_id(&conn, rt, 1).unwrap_or(-1));
2464
2465 let mut q_out_t = conn.prepare_cached(
2467 "SELECT to_id, type_id FROM relation WHERE from_id = ?1 AND type_id = ?2");
2468 let mut q_out = conn.prepare_cached(
2469 "SELECT to_id, type_id FROM relation WHERE from_id = ?1");
2470 let mut q_in_t = conn.prepare_cached(
2471 "SELECT from_id, type_id FROM relation WHERE to_id = ?1 AND type_id = ?2");
2472 let mut q_in = conn.prepare_cached(
2473 "SELECT from_id, type_id FROM relation WHERE to_id = ?1");
2474
2475 let mut cur_depth = 0u32;
2476 while cur_depth < depth && !frontier.is_empty() {
2477 let mut next_frontier: HashSet<i64> = HashSet::new();
2478
2479 for &fid in &frontier {
2480 if direction == Direction::Outgoing || direction == Direction::Both {
2481 if let Some(tid) = type_filter {
2482 if let Ok(ref mut stmt) = q_out_t
2483 && let Ok(rows) = stmt.query_map(params![fid, tid], |row| {
2484 Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?))
2485 }) {
2486 for row in rows.flatten() {
2487 let (to_id, t_id) = row;
2488 all_rels.insert((fid, to_id, t_id));
2489 if all_ids.insert(to_id) { next_frontier.insert(to_id); }
2490 }
2491 }
2492 } else if let Ok(ref mut stmt) = q_out
2493 && let Ok(rows) = stmt.query_map(params![fid], |row| {
2494 Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?))
2495 }) {
2496 for row in rows.flatten() {
2497 let (to_id, t_id) = row;
2498 all_rels.insert((fid, to_id, t_id));
2499 if all_ids.insert(to_id) { next_frontier.insert(to_id); }
2500 }
2501 }
2502 }
2503
2504 if direction == Direction::Incoming || direction == Direction::Both {
2505 if let Some(tid) = type_filter {
2506 if let Ok(ref mut stmt) = q_in_t
2507 && let Ok(rows) = stmt.query_map(params![fid, tid], |row| {
2508 Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?))
2509 }) {
2510 for row in rows.flatten() {
2511 let (from_id, t_id) = row;
2512 all_rels.insert((from_id, fid, t_id));
2513 if all_ids.insert(from_id) { next_frontier.insert(from_id); }
2514 }
2515 }
2516 } else if let Ok(ref mut stmt) = q_in
2517 && let Ok(rows) = stmt.query_map(params![fid], |row| {
2518 Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?))
2519 }) {
2520 for row in rows.flatten() {
2521 let (from_id, t_id) = row;
2522 all_rels.insert((from_id, fid, t_id));
2523 if all_ids.insert(from_id) { next_frontier.insert(from_id); }
2524 }
2525 }
2526 }
2527 }
2528
2529 if all_ids.len() > MAX_TRAVERSAL_ENTITIES || all_rels.len() > MAX_TRAVERSAL_RELS {
2533 break;
2534 }
2535
2536 frontier = next_frontier;
2537 cur_depth += 1;
2538 }
2539
2540 let entities_json: String = {
2541 if all_ids.is_empty() {
2542 "[]".to_string()
2543 } else {
2544 let ids: Vec<i64> = all_ids.iter().copied().collect();
2545 let placeholders: Vec<String> = ids.iter().map(|_| "?".to_string()).collect();
2546 let sql = format!(
2547 "SELECT COALESCE(json_group_array(json_object(
2548 'name', e.name,
2549 'entityType', t.name,
2550 'observations', COALESCE((
2551 SELECT json_group_array(o.body ORDER BY o.idx)
2552 FROM observation o WHERE o.entity_id = e.id
2553 ), json('[]'))
2554 ) ORDER BY e.id), json('[]'))
2555 FROM entity e
2556 JOIN type_dict t ON t.id = e.type_id
2557 WHERE e.id IN ({}) AND e.flags = 0",
2558 placeholders.join(",")
2559 );
2560 conn.query_row(&sql, rusqlite::params_from_iter(&ids), |row| {
2561 row.get::<_, String>(0)
2562 })
2563 .unwrap_or_else(|_| "[]".to_string())
2564 }
2565 };
2566
2567 let relations_json: String = {
2568 if all_rels.is_empty() {
2569 "[]".to_string()
2570 } else {
2571 let vals: Vec<String> = all_rels.iter().map(|_| "(?, ?, ?)".to_string()).collect();
2572 let sql = format!(
2573 "WITH r(from_id, to_id, type_id) AS (VALUES {})
2574 SELECT COALESCE(json_group_array(json_object(
2575 'from', e1.name,
2576 'to', e2.name,
2577 'relationType', t.name
2578 )), json('[]'))
2579 FROM r
2580 JOIN entity e1 ON e1.id = r.from_id
2581 JOIN entity e2 ON e2.id = r.to_id
2582 JOIN type_dict t ON t.id = r.type_id
2583 WHERE e1.flags = 0 AND e2.flags = 0",
2584 vals.join(", ")
2585 );
2586 let params: Vec<&dyn ToSql> = all_rels.iter()
2587 .flat_map(|(f, t, tp)| {
2588 vec![f as &dyn ToSql, t as &dyn ToSql, tp as &dyn ToSql]
2589 })
2590 .collect();
2591 let mut stmt = conn.prepare(&sql).map_err(sqlite_err)?;
2592 stmt.query_row(params.as_slice(), |row| row.get::<_, String>(0))
2593 .unwrap_or_else(|_| "[]".to_string())
2594 }
2595 };
2596
2597 let mut out = String::with_capacity(32 + entities_json.len() + relations_json.len());
2598 out.push_str("{\"entities\":");
2599 out.push_str(&entities_json);
2600 out.push_str(",\"relations\":");
2601 out.push_str(&relations_json);
2602 out.push('}');
2603 Ok(out)
2604 }
2605}
2606
2607#[cfg(test)]
2610mod tests {
2611 use super::*;
2612 use serde_json::Value;
2613 use std::ops::Deref;
2614 use std::path::PathBuf;
2615
2616 struct TestKg(GraphHandle, PathBuf);
2617
2618 impl Deref for TestKg {
2619 type Target = GraphHandle;
2620 fn deref(&self) -> &GraphHandle {
2621 &self.0
2622 }
2623 }
2624
2625 impl Drop for TestKg {
2626 fn drop(&mut self) {
2627 cleanup_db(&self.1);
2628 }
2629 }
2630
2631 fn cleanup_db(path: &std::path::Path) {
2632 let _ = std::fs::remove_file(path);
2633 let _ = std::fs::remove_file(path.with_extension("db-wal"));
2634 let _ = std::fs::remove_file(path.with_extension("db-shm"));
2635 }
2636
2637 fn new_kg() -> TestKg {
2638 use std::sync::atomic::AtomicU64;
2639 use std::sync::atomic::Ordering;
2640 static COUNTER: AtomicU64 = AtomicU64::new(0);
2641 let n = COUNTER.fetch_add(1, Ordering::SeqCst);
2642 let dir = std::env::temp_dir();
2643 let path = dir.join(format!("kg_test_{}_{}.db", std::process::id(), n));
2644 cleanup_db(&path);
2645 let kg = GraphHandle::new(&path, Durability::Async, SqliteTuning::default(), NonZeroUsize::new(10000).unwrap(), 4).expect("create KG");
2646 TestKg(kg, path)
2647 }
2648
2649 #[test]
2650 fn test_create_and_get_entity() {
2651 let kg = new_kg();
2652 let entities = vec![Entity {
2653 name: "test".into(),
2654 entity_type: "person".into(),
2655 observations: vec!["obs1".into(), "obs2".into()],
2656 }];
2657 let created = kg.create_entities(&entities).unwrap();
2658 assert_eq!(created.len(), 1);
2659
2660 let got = kg.get_entity("test").unwrap().unwrap();
2661 assert_eq!(got.name, "test");
2662 assert_eq!(got.entity_type, "person");
2663 assert_eq!(got.observations, vec!["obs1", "obs2"]);
2664 }
2665
2666 #[test]
2667 fn test_get_nonexistent() {
2668 let kg = new_kg();
2669 assert!(kg.get_entity("nonexistent").unwrap().is_none());
2670 }
2671
2672 #[test]
2673 fn test_delete_entity() {
2674 let kg = new_kg();
2675 kg.create_entities(&[Entity {
2676 name: "del".into(),
2677 entity_type: "t".into(),
2678 observations: vec![],
2679 }])
2680 .unwrap();
2681 assert!(kg.get_entity("del").unwrap().is_some());
2682 kg.delete_entities(&["del".to_string()]).unwrap();
2683 assert!(kg.get_entity("del").unwrap().is_none());
2684 }
2685
2686 #[test]
2687 fn test_add_and_delete_observations() {
2688 let kg = new_kg();
2689 kg.create_entities(&[Entity {
2690 name: "obs_test".into(),
2691 entity_type: "t".into(),
2692 observations: vec!["a".into()],
2693 }])
2694 .unwrap();
2695
2696 let added = kg.add_observations("obs_test", &["b".into(), "c".into()]).unwrap();
2697 assert_eq!(added.len(), 2);
2698
2699 let ent = kg.get_entity("obs_test").unwrap().unwrap();
2700 assert!(ent.observations.contains(&"b".into()));
2701 assert!(ent.observations.contains(&"c".into()));
2702
2703 kg.delete_observations("obs_test", &["b".into()]).unwrap();
2704 let ent = kg.get_entity("obs_test").unwrap().unwrap();
2705 assert!(!ent.observations.contains(&"b".into()));
2706 assert!(ent.observations.contains(&"c".into()));
2707 assert!(ent.observations.contains(&"a".into()));
2708 }
2709
2710 #[test]
2711 fn test_create_relations() {
2712 let kg = new_kg();
2713 kg.create_entities(&[
2714 Entity {
2715 name: "A".into(),
2716 entity_type: "node".into(),
2717 observations: vec![],
2718 },
2719 Entity {
2720 name: "B".into(),
2721 entity_type: "node".into(),
2722 observations: vec![],
2723 },
2724 ])
2725 .unwrap();
2726
2727 let rels = kg
2728 .create_relations(&[Relation {
2729 from: "A".into(),
2730 to: "B".into(),
2731 relation_type: "edge".into(),
2732 }])
2733 .unwrap();
2734 assert_eq!(rels.len(), 1);
2735
2736 assert_eq!(kg.get_entity_count().unwrap(), 2);
2737 assert_eq!(kg.get_relation_count().unwrap(), 1);
2738 }
2739
2740 #[test]
2741 fn test_search_nodes() {
2742 let kg = new_kg();
2743 kg.create_entities(&[Entity {
2744 name: "Einstein".into(),
2745 entity_type: "scientist".into(),
2746 observations: vec!["physics".into(), "relativity".into()],
2747 }])
2748 .unwrap();
2749
2750 let results = kg.search_nodes_filtered("physics", None, 0, 10);
2751 assert_eq!(results.len(), 1);
2752 assert_eq!(results[0].name, "Einstein");
2753
2754 let results = kg.search_nodes_filtered("physics", Some("scientist"), 0, 10);
2755 assert_eq!(results.len(), 1);
2756
2757 let results = kg.search_nodes_filtered("physics", Some("nonexistent"), 0, 10);
2758 assert_eq!(results.len(), 0);
2759 }
2760
2761 #[test]
2762 fn test_find_path() {
2763 let kg = new_kg();
2764 kg.create_entities(&[
2765 Entity { name: "A".into(), entity_type: "n".into(), observations: vec![] },
2766 Entity { name: "B".into(), entity_type: "n".into(), observations: vec![] },
2767 Entity { name: "C".into(), entity_type: "n".into(), observations: vec![] },
2768 ]).unwrap();
2769
2770 kg.create_relations(&[
2771 Relation { from: "A".into(), to: "B".into(), relation_type: "e".into() },
2772 Relation { from: "B".into(), to: "C".into(), relation_type: "e".into() },
2773 ]).unwrap();
2774
2775 let path = kg.find_path("A", "C").unwrap().unwrap();
2776 assert_eq!(path, vec!["A", "B", "C"]);
2777 }
2778
2779 #[test]
2780 fn test_degree() {
2781 let kg = new_kg();
2782 kg.create_entities(&[
2783 Entity { name: "A".into(), entity_type: "n".into(), observations: vec![] },
2784 Entity { name: "B".into(), entity_type: "n".into(), observations: vec![] },
2785 Entity { name: "C".into(), entity_type: "n".into(), observations: vec![] },
2786 ]).unwrap();
2787
2788 kg.create_relations(&[
2789 Relation { from: "A".into(), to: "B".into(), relation_type: "e".into() },
2790 Relation { from: "A".into(), to: "C".into(), relation_type: "e".into() },
2791 ]).unwrap();
2792
2793 assert_eq!(kg.degree("A", Direction::Outgoing).unwrap(), 2);
2794 assert_eq!(kg.degree("A", Direction::Incoming).unwrap(), 0);
2795 assert_eq!(kg.degree("B", Direction::Incoming).unwrap(), 1);
2796 }
2797
2798 #[test]
2799 fn test_neighbors() {
2800 let kg = new_kg();
2801 kg.create_entities(&[
2802 Entity { name: "A".into(), entity_type: "n".into(), observations: vec![] },
2803 Entity { name: "B".into(), entity_type: "n".into(), observations: vec![] },
2804 ]).unwrap();
2805
2806 kg.create_relations(&[Relation {
2807 from: "A".into(), to: "B".into(), relation_type: "e".into(),
2808 }]).unwrap();
2809
2810 let result = kg.neighbors("A", Direction::Outgoing, None, 1).unwrap();
2811 let v: Value = serde_json::from_str(&result).unwrap();
2812 assert_eq!(v["entities"].as_array().unwrap().len(), 2);
2813 assert_eq!(v["relations"].as_array().unwrap().len(), 1);
2814 }
2815
2816 #[test]
2817 fn test_open_nodes() {
2818 let kg = new_kg();
2819 kg.create_entities(&[
2820 Entity { name: "X".into(), entity_type: "n".into(), observations: vec!["obs_x".into()] },
2821 Entity { name: "Y".into(), entity_type: "n".into(), observations: vec!["obs_y".into()] },
2822 ]).unwrap();
2823
2824 kg.create_relations(&[Relation {
2825 from: "X".into(), to: "Y".into(), relation_type: "e".into(),
2826 }]).unwrap();
2827
2828 let result = kg.open_nodes(&["X".into()]);
2829 let v: Value = serde_json::from_str(&result).unwrap();
2830 assert_eq!(v["entities"].as_array().unwrap().len(), 1);
2831 assert_eq!(v["relations"].as_array().unwrap().len(), 1);
2832 }
2833
2834 #[test]
2835 fn test_entities_exist() {
2836 let kg = new_kg();
2837 kg.create_entities(&[Entity {
2838 name: "exists".into(), entity_type: "t".into(), observations: vec![],
2839 }]).unwrap();
2840
2841 let res = kg.entities_exist(&["exists".into(), "missing".into()]).unwrap();
2842 assert_eq!(res, vec![true, false]);
2843 }
2844
2845 #[test]
2846 fn test_describe_entity() {
2847 let kg = new_kg();
2848 kg.create_entities(&[Entity {
2849 name: "desc".into(), entity_type: "t".into(), observations: vec!["o".into()],
2850 }]).unwrap();
2851
2852 let entity = kg.describe_entity("desc").unwrap();
2853 assert_eq!(entity.name, "desc");
2854 }
2855
2856 #[test]
2857 fn test_entity_type_counts() {
2858 let kg = new_kg();
2859 kg.create_entities(&[
2860 Entity { name: "a".into(), entity_type: "person".into(), observations: vec![] },
2861 Entity { name: "b".into(), entity_type: "person".into(), observations: vec![] },
2862 Entity { name: "c".into(), entity_type: "place".into(), observations: vec![] },
2863 ]).unwrap();
2864
2865 let counts = kg.entity_type_counts();
2866 let map: FxHashMap<_, _> = counts.into_iter().collect();
2867 assert_eq!(map.get("person"), Some(&2));
2868 assert_eq!(map.get("place"), Some(&1));
2869 }
2870
2871 #[test]
2872 fn test_relation_type_counts() {
2873 let kg = new_kg();
2874 kg.create_entities(&[
2875 Entity { name: "a".into(), entity_type: "n".into(), observations: vec![] },
2876 Entity { name: "b".into(), entity_type: "n".into(), observations: vec![] },
2877 Entity { name: "c".into(), entity_type: "n".into(), observations: vec![] },
2878 ]).unwrap();
2879
2880 kg.create_relations(&[
2881 Relation { from: "a".into(), to: "b".into(), relation_type: "knows".into() },
2882 Relation { from: "a".into(), to: "c".into(), relation_type: "knows".into() },
2883 ]).unwrap();
2884
2885 let counts = kg.relation_type_counts();
2886 let map: FxHashMap<_, _> = counts.into_iter().collect();
2887 assert_eq!(map.get("knows"), Some(&2));
2888 }
2889
2890 #[test]
2891 fn test_upsert_entities() {
2892 let kg = new_kg();
2893 kg.create_entities(&[Entity {
2894 name: "u".into(), entity_type: "old".into(), observations: vec!["existing".into()],
2895 }]).unwrap();
2896
2897 kg.upsert_entities(&[Entity {
2899 name: "u".into(), entity_type: "new".into(), observations: vec!["existing".into(), "added".into()],
2900 }]).unwrap();
2901
2902 let ent = kg.get_entity("u").unwrap().unwrap();
2903 assert_eq!(ent.entity_type, "new");
2904 assert!(ent.observations.contains(&"added".into()));
2905 assert!(ent.observations.contains(&"existing".into()));
2906 }
2907
2908 #[test]
2909 fn test_merge_entities() {
2910 let kg = new_kg();
2911 kg.create_entities(&[
2912 Entity { name: "source".into(), entity_type: "t".into(), observations: vec!["src_obs".into()] },
2913 Entity { name: "target".into(), entity_type: "t".into(), observations: vec!["tgt_obs".into()] },
2914 ]).unwrap();
2915
2916 kg.create_relations(&[Relation {
2917 from: "source".into(), to: "target".into(), relation_type: "e".into(),
2918 }]).unwrap();
2919
2920 let merged = kg.merge_entities("source", "target").unwrap();
2921 assert_eq!(merged.name, "target");
2922 assert_eq!(kg.get_entity("source").unwrap().is_none(), true);
2923 }
2924
2925 #[test]
2926 fn test_find_all_paths() {
2927 let kg = new_kg();
2928 kg.create_entities(&[
2929 Entity { name: "A".into(), entity_type: "n".into(), observations: vec![] },
2930 Entity { name: "B".into(), entity_type: "n".into(), observations: vec![] },
2931 Entity { name: "C".into(), entity_type: "n".into(), observations: vec![] },
2932 ]).unwrap();
2933
2934 kg.create_relations(&[
2935 Relation { from: "A".into(), to: "B".into(), relation_type: "e".into() },
2936 Relation { from: "B".into(), to: "C".into(), relation_type: "e".into() },
2937 Relation { from: "A".into(), to: "C".into(), relation_type: "e".into() },
2938 ]).unwrap();
2939
2940 let paths = kg.find_all_paths("A", "C", 5, 10).unwrap();
2941 assert!(paths.len() >= 2);
2942 }
2943
2944 #[test]
2945 fn test_batch_get_entities() {
2946 let kg = new_kg();
2947 kg.create_entities(&[
2948 Entity { name: "a".into(), entity_type: "t".into(), observations: vec![] },
2949 Entity { name: "b".into(), entity_type: "t".into(), observations: vec![] },
2950 ]).unwrap();
2951
2952 let results = kg.batch_get_entities(&["a".into(), "missing".into(), "b".into()]);
2953 assert_eq!(results.len(), 3);
2954 assert!(results[0].is_some());
2955 assert!(results[1].is_none());
2956 assert!(results[2].is_some());
2957 }
2958
2959 #[test]
2960 fn test_export_graph() {
2961 let kg = new_kg();
2962 kg.create_entities(&[Entity {
2963 name: "exp".into(), entity_type: "t".into(), observations: vec!["o".into()],
2964 }]).unwrap();
2965
2966 let exported = kg.export("json", i64::MAX).unwrap();
2967 assert!(exported.contains("exp"));
2968 assert!(exported.contains("o"));
2969 }
2970
2971 #[test]
2972 fn test_graph_stats() {
2973 let kg = new_kg();
2974 assert_eq!(kg.get_entity_count().unwrap(), 0);
2975 assert_eq!(kg.get_relation_count().unwrap(), 0);
2976
2977 kg.create_entities(&[Entity {
2978 name: "s".into(), entity_type: "t".into(), observations: vec![],
2979 }]).unwrap();
2980
2981 assert_eq!(kg.get_entity_count().unwrap(), 1);
2982 }
2983
2984 #[test]
2985 fn test_read_graph_filtered() {
2986 let kg = new_kg();
2987 kg.create_entities(&[
2988 Entity { name: "p1".into(), entity_type: "person".into(), observations: vec![] },
2989 Entity { name: "p2".into(), entity_type: "place".into(), observations: vec![] },
2990 ]).unwrap();
2991
2992 let out = kg.read_graph_filtered(Some("person"), 0, 10).unwrap();
2993 let v: Value = serde_json::from_str(&out).unwrap();
2994 assert_eq!(v["entities"].as_array().unwrap().len(), 1);
2995 assert_eq!(v["entities"][0]["name"], "p1");
2996 }
2997
2998 #[test]
2999 fn test_wipe() {
3000 let kg = new_kg();
3001 kg.create_entities(&[Entity {
3002 name: "w".into(), entity_type: "t".into(), observations: vec!["o".into()],
3003 }]).unwrap();
3004 assert_eq!(kg.get_entity_count().unwrap(), 1);
3005
3006 kg.wipe().unwrap();
3007 assert_eq!(kg.get_entity_count().unwrap(), 0);
3008 }
3009
3010 #[test]
3011 fn test_push_json_str() {
3012 let mut buf = String::new();
3013 push_json_str(&mut buf, "hello");
3014 assert_eq!(buf, "\"hello\"");
3015 let mut buf = String::new();
3016 push_json_str(&mut buf, "he\"llo");
3017 assert_eq!(buf, "\"he\\\"llo\"");
3018 }
3019
3020 #[test]
3023 fn test_create_entities_empty_input() {
3024 let kg = new_kg();
3025 let created = kg.create_entities(&[]).unwrap();
3026 assert!(created.is_empty());
3027 }
3028
3029 #[test]
3030 fn test_create_entities_skip_empty_name() {
3031 let kg = new_kg();
3032 let created = kg.create_entities(&[Entity {
3033 name: "".into(),
3034 entity_type: "t".into(),
3035 observations: vec![],
3036 }])
3037 .unwrap();
3038 assert!(created.is_empty());
3039 assert_eq!(kg.get_entity_count().unwrap(), 0);
3040 }
3041
3042 #[test]
3043 fn test_create_entities_duplicate_names() {
3044 let kg = new_kg();
3045 let e = Entity {
3046 name: "dup".into(),
3047 entity_type: "t".into(),
3048 observations: vec!["obs".into()],
3049 };
3050 let first = kg.create_entities(&[e.clone()]).unwrap();
3051 assert_eq!(first.len(), 1);
3052 let second = kg.create_entities(&[e.clone()]).unwrap();
3053 assert!(second.is_empty());
3054 assert_eq!(kg.get_entity_count().unwrap(), 1);
3055 }
3056
3057 #[test]
3058 fn test_create_entities_partial_duplicates() {
3059 let kg = new_kg();
3060 let created = kg.create_entities(&[
3061 Entity { name: "a".into(), entity_type: "t".into(), observations: vec![] },
3062 Entity { name: "b".into(), entity_type: "t".into(), observations: vec![] },
3063 ]).unwrap();
3064 assert_eq!(created.len(), 2);
3065
3066 let second = kg.create_entities(&[
3067 Entity { name: "b".into(), entity_type: "t".into(), observations: vec![] },
3068 Entity { name: "c".into(), entity_type: "t".into(), observations: vec![] },
3069 ]).unwrap();
3070 assert_eq!(second.len(), 1); assert_eq!(second[0].name, "c");
3072 assert_eq!(kg.get_entity_count().unwrap(), 3);
3073 }
3074
3075 #[test]
3076 fn test_create_entities_mixed_empty_and_valid() {
3077 let kg = new_kg();
3078 let created = kg.create_entities(&[
3079 Entity { name: "".into(), entity_type: "t".into(), observations: vec![] },
3080 Entity { name: "valid".into(), entity_type: "t".into(), observations: vec![] },
3081 Entity { name: "".into(), entity_type: "t".into(), observations: vec![] },
3082 ]).unwrap();
3083 assert_eq!(created.len(), 1);
3084 assert_eq!(created[0].name, "valid");
3085 assert_eq!(kg.get_entity_count().unwrap(), 1);
3086 }
3087
3088 #[test]
3089 fn test_create_entities_same_name_in_batch() {
3090 let kg = new_kg();
3091 let created = kg.create_entities(&[
3092 Entity { name: "dup_in_batch".into(), entity_type: "t".into(), observations: vec![] },
3093 Entity { name: "dup_in_batch".into(), entity_type: "t".into(), observations: vec![] },
3094 ]).unwrap();
3095 assert_eq!(created.len(), 1);
3096 assert_eq!(kg.get_entity_count().unwrap(), 1);
3097 }
3098
3099 #[test]
3102 fn test_create_relations_empty_input() {
3103 let kg = new_kg();
3104 let rels = kg.create_relations(&[]).unwrap();
3105 assert!(rels.is_empty());
3106 }
3107
3108 #[test]
3109 fn test_create_relations_nonexistent_from() {
3110 let kg = new_kg();
3111 kg.create_entities(&[Entity {
3112 name: "B".into(), entity_type: "t".into(), observations: vec![],
3113 }]).unwrap();
3114
3115 let rels = kg.create_relations(&[Relation {
3116 from: "A".into(), to: "B".into(), relation_type: "e".into(),
3117 }]).unwrap();
3118 assert!(rels.is_empty());
3119 assert_eq!(kg.get_relation_count().unwrap(), 0);
3120 }
3121
3122 #[test]
3123 fn test_create_relations_nonexistent_to() {
3124 let kg = new_kg();
3125 kg.create_entities(&[Entity {
3126 name: "A".into(), entity_type: "t".into(), observations: vec![],
3127 }]).unwrap();
3128
3129 let rels = kg.create_relations(&[Relation {
3130 from: "A".into(), to: "B".into(), relation_type: "e".into(),
3131 }]).unwrap();
3132 assert!(rels.is_empty());
3133 assert_eq!(kg.get_relation_count().unwrap(), 0);
3134 }
3135
3136 #[test]
3137 fn test_create_relations_both_nonexistent() {
3138 let kg = new_kg();
3139 let rels = kg.create_relations(&[Relation {
3140 from: "A".into(), to: "B".into(), relation_type: "e".into(),
3141 }]).unwrap();
3142 assert!(rels.is_empty());
3143 }
3144
3145 #[test]
3146 fn test_create_relations_self_loop() {
3147 let kg = new_kg();
3148 kg.create_entities(&[Entity {
3149 name: "self".into(), entity_type: "t".into(), observations: vec![],
3150 }]).unwrap();
3151
3152 let rels = kg.create_relations(&[Relation {
3153 from: "self".into(), to: "self".into(), relation_type: "loop".into(),
3154 }]).unwrap();
3155 assert_eq!(rels.len(), 1);
3156 assert_eq!(kg.get_relation_count().unwrap(), 1);
3157 assert_eq!(kg.degree("self", Direction::Outgoing).unwrap(), 1);
3158 assert_eq!(kg.degree("self", Direction::Incoming).unwrap(), 1);
3159 }
3160
3161 #[test]
3162 fn test_create_relations_duplicate() {
3163 let kg = new_kg();
3164 kg.create_entities(&[
3165 Entity { name: "A".into(), entity_type: "t".into(), observations: vec![] },
3166 Entity { name: "B".into(), entity_type: "t".into(), observations: vec![] },
3167 ]).unwrap();
3168
3169 let r = Relation {
3170 from: "A".into(), to: "B".into(), relation_type: "e".into(),
3171 };
3172 let first = kg.create_relations(&[r.clone()]).unwrap();
3173 assert_eq!(first.len(), 1);
3174
3175 let second = kg.create_relations(&[r.clone()]).unwrap();
3176 assert!(second.is_empty());
3177 assert_eq!(kg.get_relation_count().unwrap(), 1);
3178 }
3179
3180 #[test]
3181 fn test_create_relations_new_type_auto_created() {
3182 let kg = new_kg();
3183 kg.create_entities(&[
3184 Entity { name: "A".into(), entity_type: "t".into(), observations: vec![] },
3185 Entity { name: "B".into(), entity_type: "t".into(), observations: vec![] },
3186 ]).unwrap();
3187
3188 let rels = kg.create_relations(&[Relation {
3189 from: "A".into(), to: "B".into(), relation_type: "brand_new_type".into(),
3190 }]).unwrap();
3191 assert_eq!(rels.len(), 1);
3192
3193 let counts = kg.relation_type_counts();
3194 let map: FxHashMap<_, _> = counts.into_iter().collect();
3195 assert_eq!(map.get("brand_new_type"), Some(&1));
3196 }
3197
3198 #[test]
3199 fn test_create_relations_degree_updates() {
3200 let kg = new_kg();
3201 kg.create_entities(&[
3202 Entity { name: "A".into(), entity_type: "t".into(), observations: vec![] },
3203 Entity { name: "B".into(), entity_type: "t".into(), observations: vec![] },
3204 Entity { name: "C".into(), entity_type: "t".into(), observations: vec![] },
3205 ]).unwrap();
3206
3207 kg.create_relations(&[
3208 Relation { from: "A".into(), to: "B".into(), relation_type: "e".into() },
3209 Relation { from: "A".into(), to: "C".into(), relation_type: "e".into() },
3210 ]).unwrap();
3211
3212 assert_eq!(kg.degree("A", Direction::Outgoing).unwrap(), 2);
3213 assert_eq!(kg.degree("A", Direction::Incoming).unwrap(), 0);
3214 assert_eq!(kg.degree("B", Direction::Incoming).unwrap(), 1);
3215 assert_eq!(kg.degree("C", Direction::Incoming).unwrap(), 1);
3216 assert_eq!(kg.degree("A", Direction::Both).unwrap(), 2);
3217 }
3218
3219 #[test]
3220 fn test_create_relations_delete_and_recreate() {
3221 let kg = new_kg();
3222 kg.create_entities(&[
3223 Entity { name: "A".into(), entity_type: "t".into(), observations: vec![] },
3224 Entity { name: "B".into(), entity_type: "t".into(), observations: vec![] },
3225 ]).unwrap();
3226
3227 let r = Relation {
3228 from: "A".into(), to: "B".into(), relation_type: "e".into(),
3229 };
3230 kg.create_relations(&[r.clone()]).unwrap();
3231 assert_eq!(kg.get_relation_count().unwrap(), 1);
3232
3233 kg.delete_relations(&[r.clone()]).unwrap();
3234 assert_eq!(kg.get_relation_count().unwrap(), 0);
3235
3236 let re = kg.create_relations(&[r.clone()]).unwrap();
3238 assert_eq!(re.len(), 1);
3239 assert_eq!(kg.get_relation_count().unwrap(), 1);
3240 }
3241
3242 #[test]
3245 fn test_create_entities_then_relations_then_delete_entity_with_relations() {
3246 let kg = new_kg();
3247 kg.create_entities(&[
3248 Entity { name: "A".into(), entity_type: "t".into(), observations: vec![] },
3249 Entity { name: "B".into(), entity_type: "t".into(), observations: vec![] },
3250 ]).unwrap();
3251 kg.create_relations(&[
3252 Relation { from: "A".into(), to: "B".into(), relation_type: "e".into() },
3253 ]).unwrap();
3254
3255 assert_eq!(kg.get_relation_count().unwrap(), 1);
3256
3257 kg.delete_entities(&["A".into()]).unwrap();
3259 assert_eq!(kg.get_entity("A").unwrap().is_none(), true);
3260 assert_eq!(kg.get_relation_count().unwrap(), 0);
3261 }
3262
3263 #[test]
3264 fn test_graph_stats_after_entity_with_observations() {
3265 let kg = new_kg();
3266 kg.create_entities(&[Entity {
3267 name: "stat".into(), entity_type: "t".into(),
3268 observations: vec!["o1".into(), "o2".into(), "o3".into()],
3269 }]).unwrap();
3270
3271 let ecount = kg.get_entity_count().unwrap();
3272 assert_eq!(ecount, 1);
3274
3275 kg.delete_entities(&["stat".into()]).unwrap();
3277 assert_eq!(kg.get_entity_count().unwrap(), 0);
3278 }
3279
3280 fn new_kg_with_pool(read_pool_size: usize) -> TestKg {
3283 use std::sync::atomic::AtomicU64;
3284 static COUNTER: AtomicU64 = AtomicU64::new(1_000_000);
3285 let n = COUNTER.fetch_add(1, Ordering::SeqCst);
3286 let path = std::env::temp_dir().join(format!("kg_pool_{}_{}.db", std::process::id(), n));
3287 cleanup_db(&path);
3288 let kg = GraphHandle::new(
3289 &path,
3290 Durability::Async,
3291 SqliteTuning::default(),
3292 NonZeroUsize::new(10_000).unwrap(),
3293 read_pool_size,
3294 )
3295 .expect("create KG");
3296 TestKg(kg, path)
3297 }
3298
3299 fn seed_line(kg: &GraphHandle, n: usize) {
3300 let entities: Vec<Entity> = (0..n)
3301 .map(|i| Entity {
3302 name: format!("n{i}"),
3303 entity_type: "node".into(),
3304 observations: vec![format!("obs of n{i}")],
3305 })
3306 .collect();
3307 kg.create_entities(&entities).unwrap();
3308 let rels: Vec<Relation> = (0..n.saturating_sub(1))
3309 .map(|i| Relation {
3310 from: format!("n{i}"),
3311 to: format!("n{}", i + 1),
3312 relation_type: "edge".into(),
3313 })
3314 .collect();
3315 if !rels.is_empty() {
3316 kg.create_relations(&rels).unwrap();
3317 }
3318 }
3319
3320 fn count_relations(graph_json: &str) -> usize {
3321 let v: Value = serde_json::from_str(graph_json).unwrap();
3322 v["relations"].as_array().unwrap().len()
3323 }
3324
3325 fn count_entities(graph_json: &str) -> usize {
3326 let v: Value = serde_json::from_str(graph_json).unwrap();
3327 v["entities"].as_array().unwrap().len()
3328 }
3329
3330 #[test]
3333 fn test_pool_size_one_still_works() {
3334 let kg = new_kg_with_pool(1);
3335 seed_line(&kg, 5);
3336 assert_eq!(kg.get_entity_count().unwrap(), 5);
3337 assert!(kg.get_entity("n2").unwrap().is_some());
3338 let g = kg.read_graph_filtered(None, 0, usize::MAX).unwrap();
3339 assert_eq!(count_entities(&g), 5);
3340 }
3341
3342 #[test]
3343 fn test_reads_see_committed_writes() {
3344 let kg = new_kg_with_pool(4);
3347 kg.create_entities(&[Entity {
3348 name: "fresh".into(),
3349 entity_type: "t".into(),
3350 observations: vec!["v".into()],
3351 }])
3352 .unwrap();
3353 let got = kg.get_entity("fresh").unwrap().unwrap();
3355 assert_eq!(got.observations, vec!["v"]);
3356 }
3357
3358 #[test]
3359 fn test_concurrent_readers_consistent() {
3360 let kg = new_kg_with_pool(4);
3363 seed_line(&kg, 50);
3364
3365 std::thread::scope(|s| {
3366 for _ in 0..8 {
3368 s.spawn(|| {
3369 for _ in 0..200 {
3370 let _ = kg.get_entity("n10");
3371 let _ = kg.search_nodes_filtered("obs", None, 0, 10);
3372 let _ = kg.read_graph_filtered(None, 0, 100);
3373 let _ = kg.get_entity_count();
3374 let _ = kg.neighbors("n10", Direction::Both, None, 2);
3375 }
3376 });
3377 }
3378 s.spawn(|| {
3380 for i in 100..160 {
3381 kg.create_entities(&[Entity {
3382 name: format!("w{i}"),
3383 entity_type: "node".into(),
3384 observations: vec![format!("w obs {i}")],
3385 }])
3386 .unwrap();
3387 }
3388 });
3389 });
3390
3391 assert_eq!(kg.get_entity_count().unwrap(), 110);
3393 assert!(kg.get_entity("w159").unwrap().is_some());
3394 }
3395
3396 #[test]
3397 fn test_reader_pool_rejects_writes_internally() {
3398 let kg = new_kg_with_pool(1);
3404 seed_line(&kg, 3);
3405 std::thread::scope(|s| {
3406 for _ in 0..4 {
3407 s.spawn(|| {
3408 for _ in 0..100 {
3409 let _ = kg.read_graph_filtered(None, 0, 10);
3410 }
3411 });
3412 }
3413 });
3414 assert_eq!(kg.get_entity_count().unwrap(), 3);
3415 }
3416
3417 #[test]
3420 fn test_read_graph_relations_scoped_to_page() {
3421 let kg = new_kg_with_pool(2);
3422 seed_line(&kg, 4);
3424
3425 let full = kg.read_graph_filtered(None, 0, usize::MAX).unwrap();
3427 assert_eq!(count_entities(&full), 4);
3428 assert_eq!(count_relations(&full), 3);
3429
3430 let page1 = kg.read_graph_filtered(None, 0, 1).unwrap();
3433 assert_eq!(count_entities(&page1), 1);
3434 assert_eq!(count_relations(&page1), 0);
3435
3436 let page2 = kg.read_graph_filtered(None, 0, 2).unwrap();
3439 assert_eq!(count_entities(&page2), 2);
3440 assert_eq!(count_relations(&page2), 1);
3441 }
3442
3443 #[test]
3444 fn test_read_graph_pagination_offset() {
3445 let kg = new_kg_with_pool(2);
3446 seed_line(&kg, 5);
3447 let g = kg.read_graph_filtered(None, 2, 2).unwrap();
3448 assert_eq!(count_entities(&g), 2);
3449 assert!(!g.contains("\"n0\""));
3451 assert!(!g.contains("\"n1\""));
3452 assert!(g.contains("\"n2\""));
3453 assert!(g.contains("\"n3\""));
3454 }
3455
3456 #[test]
3457 fn test_read_graph_empty() {
3458 let kg = new_kg_with_pool(2);
3459 let g = kg.read_graph_filtered(None, 0, usize::MAX).unwrap();
3460 assert_eq!(g, r#"{"entities":[],"relations":[]}"#);
3461 }
3462
3463 #[test]
3464 fn test_read_graph_filtered_by_type() {
3465 let kg = new_kg_with_pool(2);
3466 kg.create_entities(&[
3467 Entity { name: "p1".into(), entity_type: "person".into(), observations: vec![] },
3468 Entity { name: "q1".into(), entity_type: "place".into(), observations: vec![] },
3469 Entity { name: "p2".into(), entity_type: "person".into(), observations: vec![] },
3470 ])
3471 .unwrap();
3472 let g = kg.read_graph_filtered(Some("person"), 0, usize::MAX).unwrap();
3473 assert_eq!(count_entities(&g), 2);
3474 assert!(g.contains("\"p1\""));
3475 assert!(g.contains("\"p2\""));
3476 assert!(!g.contains("\"q1\""));
3477 }
3478
3479 #[test]
3480 fn test_export_respects_max_rows() {
3481 let kg = new_kg_with_pool(2);
3482 seed_line(&kg, 5);
3483
3484 let full = kg.export("json", i64::MAX).unwrap();
3486 assert_eq!(count_entities(&full), 5);
3487 assert_eq!(count_relations(&full), 4);
3488
3489 let capped = kg.export("json", 2).unwrap();
3491 assert_eq!(count_entities(&capped), 2);
3492 assert_eq!(count_relations(&capped), 2);
3493 }
3494
3495 #[test]
3496 fn test_export_negative_max_rows_is_unbounded() {
3497 let kg = new_kg_with_pool(2);
3498 seed_line(&kg, 3);
3499 let out = kg.export("json", -1).unwrap();
3501 assert_eq!(count_entities(&out), 3);
3502 }
3503
3504 #[test]
3507 fn test_many_small_write_batches_stay_consistent() {
3508 let kg = new_kg_with_pool(2);
3509 for i in 0..100 {
3510 kg.create_entities(&[Entity {
3511 name: format!("e{i}"),
3512 entity_type: "t".into(),
3513 observations: vec![format!("o{i}")],
3514 }])
3515 .unwrap();
3516 }
3517 assert_eq!(kg.get_entity_count().unwrap(), 100);
3518 let hits = kg.search_nodes_filtered("e57", None, 0, 10);
3521 assert!(hits.iter().any(|e| e.name == "e57"));
3522 }
3523
3524 #[test]
3527 fn test_wipe_clears_name_and_obs_fts() {
3528 let kg = new_kg_with_pool(2);
3529 kg.create_entities(&[Entity {
3530 name: "Einstein".into(),
3531 entity_type: "scientist".into(),
3532 observations: vec!["physics".into()],
3533 }])
3534 .unwrap();
3535
3536 assert_eq!(kg.search_nodes_filtered("Einstein", None, 0, 10).len(), 1);
3538 assert_eq!(kg.search_nodes_filtered("physics", None, 0, 10).len(), 1);
3539
3540 kg.wipe().unwrap();
3541
3542 assert_eq!(kg.get_entity_count().unwrap(), 0);
3545 assert!(kg.search_nodes_filtered("Einstein", None, 0, 10).is_empty());
3546 assert!(kg.search_nodes_filtered("physics", None, 0, 10).is_empty());
3547 }
3548
3549 #[test]
3550 fn test_wipe_then_recreate_search_works() {
3551 let kg = new_kg_with_pool(2);
3554 kg.create_entities(&[Entity {
3555 name: "Einstein".into(),
3556 entity_type: "scientist".into(),
3557 observations: vec!["physics".into()],
3558 }])
3559 .unwrap();
3560 kg.wipe().unwrap();
3561
3562 kg.create_entities(&[Entity {
3563 name: "Einstein".into(),
3564 entity_type: "scientist".into(),
3565 observations: vec!["physics".into(), "relativity".into()],
3566 }])
3567 .unwrap();
3568
3569 let by_name = kg.search_nodes_filtered("Einstein", None, 0, 10);
3570 assert_eq!(by_name.len(), 1, "exactly one Einstein after recreate");
3571 let by_obs = kg.search_nodes_filtered("relativity", None, 0, 10);
3572 assert_eq!(by_obs.len(), 1);
3573 assert_eq!(kg.get_entity_count().unwrap(), 1);
3574 }
3575
3576 #[test]
3579 fn test_search_relations_missing_type_returns_empty() {
3580 let kg = new_kg_with_pool(2);
3581 seed_line(&kg, 3); let r = kg.search_relations(None, None, Some("does_not_exist"), None);
3585 assert!(r.is_empty());
3586 let types = kg.relation_type_counts();
3588 assert!(types.iter().all(|(t, _)| t != "does_not_exist"));
3589 }
3590
3591 #[test]
3592 fn test_search_relations_missing_from_returns_empty() {
3593 let kg = new_kg_with_pool(2);
3594 seed_line(&kg, 3);
3595 let r = kg.search_relations(Some("ghost"), None, None, None);
3596 assert!(r.is_empty(), "missing 'from' must not match every relation");
3597 }
3598
3599 #[test]
3600 fn test_search_relations_existing_filters_still_work() {
3601 let kg = new_kg_with_pool(2);
3602 seed_line(&kg, 3);
3603 let r = kg.search_relations(Some("n0"), None, Some("edge"), None);
3604 assert_eq!(r.len(), 1);
3605 assert_eq!(r[0].from, "n0");
3606 assert_eq!(r[0].to, "n1");
3607 }
3608
3609 #[test]
3610 fn test_neighbors_missing_type_returns_only_start() {
3611 let kg = new_kg_with_pool(2);
3612 seed_line(&kg, 3);
3613 let json = kg
3614 .neighbors("n0", Direction::Both, Some("nonexistent"), 2)
3615 .unwrap();
3616 assert_eq!(count_entities(&json), 1);
3618 assert_eq!(count_relations(&json), 0);
3619 }
3620
3621 #[test]
3622 fn test_neighbors_existing_type_filters() {
3623 let kg = new_kg_with_pool(2);
3624 kg.create_entities(&[
3625 Entity { name: "a".into(), entity_type: "n".into(), observations: vec![] },
3626 Entity { name: "b".into(), entity_type: "n".into(), observations: vec![] },
3627 Entity { name: "c".into(), entity_type: "n".into(), observations: vec![] },
3628 ])
3629 .unwrap();
3630 kg.create_relations(&[
3631 Relation { from: "a".into(), to: "b".into(), relation_type: "knows".into() },
3632 Relation { from: "a".into(), to: "c".into(), relation_type: "likes".into() },
3633 ])
3634 .unwrap();
3635 let json = kg.neighbors("a", Direction::Outgoing, Some("knows"), 1).unwrap();
3636 assert!(json.contains("\"b\""));
3637 assert!(!json.contains("\"c\""));
3638 assert_eq!(count_relations(&json), 1);
3639 }
3640
3641 #[test]
3642 fn test_sqlite_tuning_applied_to_fresh_db() {
3643 use std::sync::atomic::AtomicU64;
3644 static COUNTER: AtomicU64 = AtomicU64::new(2_000_000);
3645 let n = COUNTER.fetch_add(1, Ordering::SeqCst);
3646 let path = std::env::temp_dir().join(format!("kg_tuning_{}_{}.db", std::process::id(), n));
3647 cleanup_db(&path);
3648
3649 let tuning = SqliteTuning {
3650 page_size: 8192,
3651 ..SqliteTuning::default()
3652 };
3653 let kg = TestKg(
3654 GraphHandle::new(&path, Durability::Async, tuning, NonZeroUsize::new(64).unwrap(), 2)
3655 .expect("create KG"),
3656 path.clone(),
3657 );
3658 kg.create_entities(&[Entity {
3659 name: "a".into(),
3660 entity_type: "n".into(),
3661 observations: vec!["o".into()],
3662 }])
3663 .unwrap();
3664
3665 let probe = Connection::open(&path).unwrap();
3668 let page_size: i64 = probe.query_row("PRAGMA page_size", [], |r| r.get(0)).unwrap();
3669 assert_eq!(page_size, 8192);
3670 let auto_vacuum: i64 = probe.query_row("PRAGMA auto_vacuum", [], |r| r.get(0)).unwrap();
3671 assert_eq!(auto_vacuum, 2, "expected INCREMENTAL auto_vacuum");
3672 let journal: String = probe.query_row("PRAGMA journal_mode", [], |r| r.get(0)).unwrap();
3673 assert_eq!(journal.to_lowercase(), "wal");
3674 }
3675
3676 #[test]
3677 fn test_checkpoint_passive_is_noop_safe() {
3678 let kg = new_kg();
3679 kg.checkpoint_passive().unwrap();
3681 kg.create_entities(&[Entity {
3682 name: "a".into(),
3683 entity_type: "n".into(),
3684 observations: vec!["o".into()],
3685 }])
3686 .unwrap();
3687 kg.checkpoint_passive().unwrap();
3689 kg.checkpoint_passive().unwrap();
3690 assert!(kg.get_entity("a").unwrap().is_some());
3692 }
3693}