1use rustc_hash::FxHashMap;
2use std::collections::{HashSet, VecDeque};
3use std::num::NonZeroUsize;
4use std::path::Path;
5use std::sync::atomic::{AtomicI64, AtomicUsize, Ordering};
6use std::time::{Duration, SystemTime, UNIX_EPOCH};
7
8use lru::LruCache;
9use parking_lot::{Mutex, MutexGuard};
10use rusqlite::{params, types::ToSql, Connection, OpenFlags};
11
12use crate::config::Durability;
13use crate::errors::{MCSError, Result};
14use crate::types::{Entity, Relation};
15
16fn sqlite_err(e: rusqlite::Error) -> MCSError {
17 MCSError::IoError(std::io::Error::other(e.to_string()))
18}
19
20fn is_not_found(e: &rusqlite::Error) -> bool {
21 matches!(e, rusqlite::Error::QueryReturnedNoRows)
22}
23
24#[inline(always)]
25fn now_us() -> i64 {
26 SystemTime::now()
27 .duration_since(UNIX_EPOCH)
28 .unwrap_or_default()
29 .as_micros() as i64
30}
31
32#[inline(always)]
33pub(crate) fn name_hash(name: &str) -> i64 {
34 let mut h: u64 = 0xcbf29ce484222325;
35 for b in name.bytes() {
36 h ^= b as u64;
37 h = h.wrapping_mul(0x100000001b3);
38 }
39 h as i64
40}
41
42fn load_observations(conn: &Connection, entity_id: i64) -> Result<Vec<String>> {
43 let mut stmt = conn
44 .prepare_cached("SELECT body FROM observation WHERE entity_id = ?1 ORDER BY idx")
45 .map_err(sqlite_err)?;
46 let rows = stmt
47 .query_map(params![entity_id], |row| row.get::<_, String>(0))
48 .map_err(sqlite_err)?
49 .filter_map(|r| r.ok())
50 .collect::<Vec<_>>();
51 Ok(rows)
52}
53
54fn load_observations_opt(conn: &Connection, entity_id: i64) -> Vec<String> {
55 load_observations(conn, entity_id).unwrap_or_default()
56}
57
58fn entity_name_lookup(conn: &Connection, name: &str) -> Result<Option<i64>> {
59 let h = name_hash(name);
60 let mut stmt = conn
61 .prepare_cached(
62 "SELECT id FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
63 )
64 .map_err(sqlite_err)?;
65 match stmt.query_row(params![h, name], |row| row.get::<_, i64>(0)) {
66 Ok(id) => Ok(Some(id)),
67 Err(e) if is_not_found(&e) => Ok(None),
68 Err(e) => Err(sqlite_err(e)),
69 }
70}
71
72fn get_type_id(conn: &Connection, type_name: &str, kind: i64) -> Result<i64> {
73 let mut sel = conn
74 .prepare_cached("SELECT id FROM type_dict WHERE kind = ?1 AND name = ?2")
75 .map_err(sqlite_err)?;
76 if let Ok(id) = sel.query_row(params![kind, type_name], |row| row.get::<_, i64>(0)) {
77 return Ok(id);
78 }
79 conn.execute(
80 "INSERT INTO type_dict (kind, name, count) VALUES (?1, ?2, 0)",
81 params![kind, type_name],
82 )
83 .map_err(sqlite_err)?;
84 Ok(conn.last_insert_rowid())
85}
86
87fn lookup_type_id(conn: &Connection, type_name: &str, kind: i64) -> Option<i64> {
91 conn.prepare_cached("SELECT id FROM type_dict WHERE kind = ?1 AND name = ?2")
92 .ok()?
93 .query_row(params![kind, type_name], |row| row.get::<_, i64>(0))
94 .ok()
95}
96
97fn inc_type_count(conn: &Connection, type_id: i64, delta: i64) -> Result<()> {
98 conn.execute(
99 "UPDATE type_dict SET count = count + ?1 WHERE id = ?2",
100 params![delta, type_id],
101 )
102 .map_err(sqlite_err)?;
103 Ok(())
104}
105
106fn inc_graph_stat(conn: &Connection, key: &str, delta: i64) -> Result<()> {
107 conn.execute(
108 "UPDATE graph_stat SET value = value + ?1 WHERE key = ?2",
109 params![delta, key],
110 )
111 .map_err(sqlite_err)?;
112 Ok(())
113}
114
115fn read_graph_stat(conn: &Connection, key: &str) -> Result<i64> {
116 conn.query_row(
117 "SELECT value FROM graph_stat WHERE key = ?1",
118 params![key],
119 |row| row.get(0),
120 )
121 .map_err(sqlite_err)
122}
123
124fn name_of_type(conn: &Connection, type_id: i64) -> Result<String> {
125 conn.query_row(
126 "SELECT name FROM type_dict WHERE id = ?1",
127 params![type_id],
128 |row| row.get(0),
129 )
130 .map_err(sqlite_err)
131}
132
133fn select_all_types(conn: &Connection, kind: i64) -> Result<Vec<(String, usize)>> {
134 let mut stmt = conn
135 .prepare_cached(
136 "SELECT name, count FROM type_dict WHERE kind = ?1 AND count > 0 ORDER BY count DESC",
137 )
138 .map_err(sqlite_err)?;
139 let rows = stmt
140 .query_map(params![kind], |row| {
141 Ok((
142 row.get::<_, String>(0)?,
143 row.get::<_, i64>(1)? as usize,
144 ))
145 })
146 .map_err(sqlite_err)?
147 .filter_map(|r| r.ok())
148 .collect();
149 Ok(rows)
150}
151
152fn entity_by_id(conn: &Connection, id: i64) -> Result<Entity> {
153 let mut stmt = conn
154 .prepare_cached(
155 "SELECT e.name, t.name,
156 COALESCE((SELECT json_group_array(o.body ORDER BY o.idx) FROM observation o WHERE o.entity_id = e.id), '[]')
157 FROM entity e JOIN type_dict t ON t.id = e.type_id WHERE e.id = ?1 AND e.flags = 0",
158 )
159 .map_err(sqlite_err)?;
160 let (name, etype, obs_json): (String, String, String) = stmt
161 .query_row(params![id], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)))
162 .map_err(sqlite_err)?;
163 let observations: Vec<String> = serde_json::from_str(&obs_json).unwrap_or_default();
164 Ok(Entity {
165 name,
166 entity_type: etype,
167 observations,
168 })
169}
170
171#[derive(Debug, Clone, Copy, PartialEq, Eq)]
173pub enum Direction {
174 Outgoing,
175 Incoming,
176 Both,
177}
178
179impl Direction {
180 pub fn parse(s: Option<&str>) -> Self {
181 match s {
182 Some("OUTGOING") => Direction::Outgoing,
183 Some("INCOMING") => Direction::Incoming,
184 _ => Direction::Both,
185 }
186 }
187}
188
189pub fn push_json_str(buf: &mut String, raw: &str) {
192 buf.push('"');
193 let mut start = 0;
194 let bytes = raw.as_bytes();
195 for (i, &b) in bytes.iter().enumerate() {
196 let esc: u8 = match b {
197 b'"' => b'"',
198 b'\\' => b'\\',
199 b'\n' => b'n',
200 b'\r' => b'r',
201 b'\t' => b't',
202 0x08 => b'b',
203 0x0C => b'f',
204 0x00..=0x07 | 0x0B | 0x0E..=0x1F => continue, _ => continue,
206 };
207 buf.push_str(&raw[start..i]);
208 buf.push('\\');
209 buf.push(esc as char);
210 start = i + 1;
211 }
212 for (i, &b) in bytes.iter().enumerate().skip(start) {
214 if b <= 0x07 || b == 0x0B || (b >= 0x0E && b <= 0x1F) {
215 buf.push_str(&raw[start..i]);
216 write_escape_unicode(buf, b);
217 start = i + 1;
218 }
219 }
220 buf.push_str(&raw[start..]);
221 buf.push('"');
222}
223
224#[inline(never)]
225fn write_escape_unicode(buf: &mut String, b: u8) {
226 use std::fmt::Write;
227 write!(buf, "\\u{:04x}", b).unwrap();
228}
229
230#[derive(Copy, Clone)]
233struct EntityMeta {
234 id: i64,
235 type_id: i64,
236 obs_count: i64,
237 out_deg: i64,
238 in_deg: i64,
239}
240
241struct TxGuard<'a> {
244 conn: &'a Connection,
245 done: bool,
246}
247
248impl<'a> TxGuard<'a> {
249 fn begin(conn: &'a Connection) -> Result<Self> {
250 conn.execute_batch("BEGIN IMMEDIATE").map_err(sqlite_err)?;
255 Ok(Self { conn, done: false })
256 }
257
258 fn commit(mut self) -> Result<()> {
259 self.done = true;
260 self.conn.execute_batch("COMMIT").map_err(sqlite_err)
261 }
262}
263
264impl Drop for TxGuard<'_> {
265 fn drop(&mut self) {
266 if !self.done {
267 let _ = self.conn.execute_batch("ROLLBACK");
268 }
269 }
270}
271
272struct ReaderPool {
279 conns: Vec<Mutex<Connection>>,
280 next: AtomicUsize,
281}
282
283impl ReaderPool {
284 fn get(&self) -> MutexGuard<'_, Connection> {
288 for c in &self.conns {
289 if let Some(g) = c.try_lock() {
290 return g;
291 }
292 }
293 let i = self.next.fetch_add(1, Ordering::Relaxed) % self.conns.len();
294 self.conns[i].lock()
295 }
296}
297
298pub struct GraphHandle {
301 writer: Mutex<Connection>,
304 readers: ReaderPool,
306 seq_entity: AtomicI64,
307 seq_obs: AtomicI64,
308 cache: Mutex<LruCache<String, EntityMeta>>,
309}
310
311fn open_reader(path: &Path, mmap_size: i64) -> Result<Connection> {
318 let conn = Connection::open_with_flags(
319 path,
320 OpenFlags::SQLITE_OPEN_READ_WRITE
321 | OpenFlags::SQLITE_OPEN_NO_MUTEX
322 | OpenFlags::SQLITE_OPEN_URI,
323 )
324 .map_err(sqlite_err)?;
325 conn.busy_timeout(Duration::from_secs(10)).map_err(sqlite_err)?;
326 conn.execute_batch(
327 "PRAGMA query_only = ON;
328 PRAGMA cache_size = -50000;
329 PRAGMA temp_store = MEMORY;",
330 )
331 .map_err(sqlite_err)?;
332 conn.execute_batch(&format!("PRAGMA mmap_size = {mmap_size};"))
333 .map_err(sqlite_err)?;
334 Ok(conn)
335}
336
337impl GraphHandle {
338 pub fn new(
339 path: &Path,
340 durability: Durability,
341 mmap_size: i64,
342 lru_cache_size: NonZeroUsize,
343 read_pool_size: usize,
344 ) -> Result<Self> {
345 {
346 let tmp = Connection::open(path).map_err(sqlite_err)?;
347 tmp.execute_batch("PRAGMA page_size = 16384;")
348 .map_err(sqlite_err)?;
349 drop(tmp);
350 }
351
352 let conn = Connection::open(path).map_err(sqlite_err)?;
353 conn.busy_timeout(Duration::from_secs(10)).map_err(sqlite_err)?;
356
357 conn.execute_batch(
358 "PRAGMA journal_mode = WAL;
359 PRAGMA foreign_keys = OFF;
360 PRAGMA cache_size = -50000;
361 PRAGMA temp_store = MEMORY;
362 PRAGMA busy_timeout = 5000;
363 PRAGMA synchronous = NORMAL;
364 PRAGMA journal_size_limit = 67108864;
365
366 CREATE TABLE IF NOT EXISTS entity (
367 id INTEGER PRIMARY KEY,
368 name_hash INTEGER NOT NULL,
369 name TEXT NOT NULL,
370 type_id INTEGER NOT NULL,
371 obs_count INTEGER NOT NULL DEFAULT 0,
372 out_deg INTEGER NOT NULL DEFAULT 0,
373 in_deg INTEGER NOT NULL DEFAULT 0,
374 created_us INTEGER NOT NULL,
375 updated_us INTEGER NOT NULL,
376 flags INTEGER NOT NULL DEFAULT 0
377 ) STRICT;
378
379 CREATE INDEX IF NOT EXISTS entity_by_hash
380 ON entity(name_hash, type_id, obs_count, out_deg, in_deg)
381 WHERE flags = 0;
382
383 CREATE INDEX IF NOT EXISTS entity_name_ci
384 ON entity(lower(name))
385 WHERE flags = 0;
386
387 CREATE TABLE IF NOT EXISTS observation (
388 id INTEGER PRIMARY KEY,
389 entity_id INTEGER NOT NULL,
390 idx INTEGER NOT NULL,
391 body TEXT NOT NULL,
392 created_us INTEGER NOT NULL
393 ) STRICT;
394
395 CREATE INDEX IF NOT EXISTS obs_by_entity
396 ON observation(entity_id, idx);
397
398 CREATE TABLE IF NOT EXISTS relation (
399 from_id INTEGER NOT NULL,
400 to_id INTEGER NOT NULL,
401 type_id INTEGER NOT NULL,
402 created_us INTEGER NOT NULL
403 ) STRICT;
404
405 CREATE INDEX IF NOT EXISTS rel_out
406 ON relation(from_id, type_id, to_id);
407
408 CREATE INDEX IF NOT EXISTS rel_in
409 ON relation(to_id, type_id, from_id);
410
411 CREATE VIRTUAL TABLE IF NOT EXISTS name_fts
412 USING fts5(name, content='entity', content_rowid='id',
413 tokenize='unicode61 remove_diacritics 2');
414
415 CREATE VIRTUAL TABLE IF NOT EXISTS obs_fts
416 USING fts5(body, content='observation', content_rowid='id',
417 tokenize='unicode61 remove_diacritics 2');
418
419 CREATE TRIGGER IF NOT EXISTS obs_fts_ai AFTER INSERT ON observation BEGIN
420 INSERT INTO obs_fts(rowid, body) VALUES (new.id, new.body);
421 END;
422
423 CREATE TRIGGER IF NOT EXISTS obs_fts_bd BEFORE DELETE ON observation BEGIN
424 INSERT INTO obs_fts(obs_fts, rowid, body) VALUES ('delete', old.id, '');
425 END;
426
427 CREATE TABLE IF NOT EXISTS type_dict (
428 id INTEGER PRIMARY KEY,
429 kind INTEGER NOT NULL,
430 name TEXT NOT NULL,
431 count INTEGER NOT NULL DEFAULT 0
432 ) STRICT;
433
434 CREATE INDEX IF NOT EXISTS type_by_name
435 ON type_dict(kind, name);
436
437 CREATE TABLE IF NOT EXISTS graph_stat (
438 key TEXT NOT NULL PRIMARY KEY,
439 value INTEGER NOT NULL
440 ) STRICT, WITHOUT ROWID;
441
442 CREATE TABLE IF NOT EXISTS hub_degree (
443 entity_id INTEGER PRIMARY KEY,
444 out_deg INTEGER NOT NULL,
445 in_deg INTEGER NOT NULL
446 ) STRICT;
447
448 CREATE TABLE IF NOT EXISTS partition_map (
449 table_name TEXT NOT NULL PRIMARY KEY,
450 role INTEGER NOT NULL,
451 type_id INTEGER,
452 row_count INTEGER NOT NULL DEFAULT 0
453 ) STRICT, WITHOUT ROWID;",
454 )
455 .map_err(sqlite_err)?;
456
457 conn.execute_batch(&format!("PRAGMA mmap_size = {mmap_size};"))
458 .map_err(sqlite_err)?;
459
460 let sync_pragma = match durability {
461 Durability::Sync => "PRAGMA synchronous = FULL",
462 Durability::Async => "PRAGMA synchronous = NORMAL",
463 };
464 conn.execute_batch(sync_pragma).map_err(sqlite_err)?;
465
466 let has_stat: bool = conn
467 .query_row("SELECT 1 FROM graph_stat LIMIT 1", [], |_| Ok(()))
468 .is_ok();
469 if !has_stat {
470 conn.execute_batch(
471 "INSERT INTO graph_stat(key, value) VALUES
472 ('entities', 0), ('relations', 0), ('observations', 0),
473 ('entity_seq', 0), ('obs_seq', 0);",
474 )
475 .map_err(sqlite_err)?;
476 }
477
478 conn.execute_batch("PRAGMA optimize;").map_err(sqlite_err)?;
479
480 let seq_entity = read_graph_stat(&conn, "entity_seq").unwrap_or(0);
481 let seq_obs = read_graph_stat(&conn, "obs_seq").unwrap_or(0);
482
483 let pool_size = read_pool_size.max(1);
486 let mut conns = Vec::with_capacity(pool_size);
487 for _ in 0..pool_size {
488 conns.push(Mutex::new(open_reader(path, mmap_size)?));
489 }
490 let readers = ReaderPool {
491 conns,
492 next: AtomicUsize::new(0),
493 };
494
495 Ok(Self {
496 writer: Mutex::new(conn),
497 readers,
498 seq_entity: AtomicI64::new(seq_entity),
499 seq_obs: AtomicI64::new(seq_obs),
500 cache: Mutex::new(LruCache::new(lru_cache_size)),
501 })
502 }
503
504 fn next_entity_id(&self) -> i64 {
505 self.seq_entity.fetch_add(1, Ordering::Relaxed) + 1
506 }
507
508 fn next_obs_id(&self) -> i64 {
509 self.seq_obs.fetch_add(1, Ordering::Relaxed) + 1
510 }
511
512 fn meta_get(&self, name: &str) -> Option<EntityMeta> {
513 self.cache.lock().get(name).copied()
514 }
515
516 fn meta_set(&self, name: &str, m: EntityMeta) {
517 self.cache.lock().put(name.to_string(), m);
518 }
519
520 fn meta_remove(&self, name: &str) {
521 self.cache.lock().pop(name);
522 }
523
524 fn meta_update(&self, name: &str, f: impl FnOnce(&mut EntityMeta)) {
525 let mut cache = self.cache.lock();
526 if let Some(m) = cache.get_mut(name) {
527 f(m);
528 }
529 }
530
531 fn get_entity_id(&self, conn: &Connection, name: &str) -> Result<Option<(i64, i64, i64, i64)>> {
532 if let Some(m) = self.meta_get(name) {
533 return Ok(Some((m.id, m.type_id, m.out_deg, m.in_deg)));
534 }
535 let h = name_hash(name);
536 let mut stmt = conn
537 .prepare_cached(
538 "SELECT id, type_id, obs_count, out_deg, in_deg
539 FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
540 )
541 .map_err(sqlite_err)?;
542 match stmt.query_row(params![h, name], |row| {
543 Ok(EntityMeta {
544 id: row.get(0)?,
545 type_id: row.get(1)?,
546 obs_count: row.get(2)?,
547 out_deg: row.get(3)?,
548 in_deg: row.get(4)?,
549 })
550 }) {
551 Ok(m) => {
552 self.meta_set(name, m);
553 Ok(Some((m.id, m.type_id, m.out_deg, m.in_deg)))
554 }
555 Err(e) if is_not_found(&e) => Ok(None),
556 Err(e) => Err(sqlite_err(e)),
557 }
558 }
559
560 fn sync_seqs(&self, conn: &Connection) -> Result<()> {
561 let seq_e = self.seq_entity.load(Ordering::Relaxed);
562 let seq_o = self.seq_obs.load(Ordering::Relaxed);
563 conn.execute(
564 "UPDATE graph_stat SET value = CASE key WHEN 'entity_seq' THEN ?1 WHEN 'obs_seq' THEN ?2 ELSE value END
565 WHERE key IN ('entity_seq', 'obs_seq')",
566 params![seq_e, seq_o],
567 )
568 .map_err(sqlite_err)?;
569 Ok(())
570 }
571
572 pub fn get_entity(&self, name: &str) -> Result<Option<Entity>> {
575 if name.is_empty() {
576 return Ok(None);
577 }
578
579 if let Some(m) = self.meta_get(name) {
580 let conn = self.readers.get();
581 let etype = name_of_type(&conn, m.type_id).unwrap_or_default();
582 let observations = load_observations_opt(&conn, m.id);
583 return Ok(Some(Entity {
584 name: name.to_string(),
585 entity_type: etype,
586 observations,
587 }));
588 }
589
590 let conn = self.readers.get();
591 let h = name_hash(name);
592 let mut stmt = conn
593 .prepare_cached(
594 "SELECT e.id, e.type_id, e.name, t.name,
595 e.obs_count, e.out_deg, e.in_deg
596 FROM entity e
597 JOIN type_dict t ON t.id = e.type_id
598 WHERE e.name_hash = ?1 AND e.name = ?2 AND e.flags = 0",
599 )
600 .map_err(sqlite_err)?;
601 match stmt.query_row(params![h, name], |row| {
602 let id: i64 = row.get(0)?;
603 let type_id: i64 = row.get(1)?;
604 let ename: String = row.get(2)?;
605 let etype: String = row.get(3)?;
606 let obs_count: i64 = row.get(4)?;
607 let out_deg: i64 = row.get(5)?;
608 let in_deg: i64 = row.get(6)?;
609 Ok((id, type_id, ename, etype, obs_count, out_deg, in_deg))
610 }) {
611 Ok((id, type_id, ename, etype, obs_count, out_deg, in_deg)) => {
612 let observations = load_observations_opt(&conn, id);
613 drop(stmt);
614 drop(conn);
615 self.meta_set(&ename, EntityMeta { id, type_id, obs_count, out_deg, in_deg });
616 Ok(Some(Entity {
617 name: ename,
618 entity_type: etype,
619 observations,
620 }))
621 }
622 Err(e) if is_not_found(&e) => Ok(None),
623 Err(e) => Err(sqlite_err(e)),
624 }
625 }
626
627 pub fn create_entities(&self, entities: &[Entity]) -> Result<Vec<Entity>> {
628 let conn = self.writer.lock();
629 let tx = TxGuard::begin(&conn)?;
630
631 let mut ins_ent = conn
632 .prepare_cached(
633 "INSERT INTO entity (id, name_hash, name, type_id, obs_count, out_deg, in_deg, created_us, updated_us, flags)
634 SELECT ?1, ?2, ?3, ?4, ?5, 0, 0, ?6, ?6, 0
635 WHERE NOT EXISTS (SELECT 1 FROM entity WHERE name_hash = ?2 AND name = ?3 AND flags = 0)",
636 )
637 .map_err(sqlite_err)?;
638
639 let mut ins_fts = conn
640 .prepare_cached("INSERT INTO name_fts (rowid, name) VALUES (?1, ?2)")
641 .map_err(sqlite_err)?;
642
643 let batch_ts = now_us();
644 let mut type_cache: FxHashMap<String, i64> = FxHashMap::default();
645 let mut type_deltas: FxHashMap<i64, i64> = FxHashMap::default();
646 let mut total_entities: i64 = 0;
647 let mut total_obs: i64 = 0;
648 let mut created = Vec::new();
649 let mut created_metas: Vec<(String, EntityMeta)> = Vec::new();
650 let mut obs_sql = String::new();
651
652 for entity in entities {
653 if entity.name.is_empty() {
654 continue;
655 }
656 let h = name_hash(&entity.name);
657 let id = self.next_entity_id();
658 let type_id = match type_cache.get(entity.entity_type.as_str()) {
659 Some(t) => *t,
660 None => {
661 let t = get_type_id(&conn, &entity.entity_type, 0)?;
662 type_cache.insert(entity.entity_type.clone(), t);
663 t
664 }
665 };
666 let obs_count = entity.observations.len() as i64;
667
668 let changed = ins_ent
669 .execute(params![id, h, entity.name, type_id, obs_count, batch_ts])
670 .map_err(sqlite_err)?;
671 if changed == 0 {
672 continue;
673 }
674
675 let n = entity.observations.len();
676 if n > 0 {
677 obs_sql.clear();
678
679 let mut oids = Vec::with_capacity(n);
680 let mut idxs = Vec::with_capacity(n);
681 for _ in 0..n {
682 oids.push(self.next_obs_id());
683 }
684 for i in 0..n as i64 {
685 idxs.push(i);
686 }
687
688 obs_sql.push_str("INSERT INTO observation (id,entity_id,idx,body,created_us) VALUES");
689 for i in 0..n {
690 if i > 0 { obs_sql.push(','); }
691 obs_sql.push_str("(?,?,?,?,?)");
692 }
693
694 let mut obs_params: Vec<&dyn ToSql> = Vec::with_capacity(n * 5);
695 for (i, obs) in entity.observations.iter().enumerate() {
696 obs_params.push(&oids[i]);
697 obs_params.push(&id);
698 obs_params.push(&idxs[i]);
699 obs_params.push(obs);
700 obs_params.push(&batch_ts);
701 }
702
703 conn.execute(&obs_sql, obs_params.as_slice())
704 .map_err(sqlite_err)?;
705 }
706
707 ins_fts
708 .execute(params![id, entity.name])
709 .map_err(sqlite_err)?;
710
711 *type_deltas.entry(type_id).or_insert(0) += 1;
712 total_entities += 1;
713 total_obs += obs_count;
714
715 created.push(entity.clone());
716 created_metas.push((entity.name.clone(), EntityMeta {
717 id,
718 type_id,
719 obs_count,
720 out_deg: 0,
721 in_deg: 0,
722 }));
723 }
724
725 if total_entities > 0 {
726 for (type_id, delta) in &type_deltas {
727 inc_type_count(&conn, *type_id, *delta)?;
728 }
729 inc_graph_stat(&conn, "entities", total_entities)?;
730 inc_graph_stat(&conn, "observations", total_obs)?;
731 self.sync_seqs(&conn)?;
732 }
733
734 tx.commit()?;
735
736 if !created_metas.is_empty() {
741 let mut cache = self.cache.lock();
742 for (name, meta) in &created_metas {
743 cache.put(name.clone(), *meta);
744 }
745 }
746
747 Ok(created)
748 }
749
750 pub fn delete_entities(&self, names: &[String]) -> Result<()> {
751 if names.is_empty() {
752 return Ok(());
753 }
754 let conn = self.writer.lock();
755
756 let mut resolved: Vec<(i64, i64, String)> = Vec::with_capacity(names.len());
758 let mut sel = conn
759 .prepare_cached(
760 "SELECT id, type_id FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
761 )
762 .map_err(sqlite_err)?;
763 for name in names {
764 let h = name_hash(name);
765 let (id, type_id) = match sel.query_row(params![h, name], |row| {
766 Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?))
767 }) {
768 Ok(v) => v,
769 Err(e) if is_not_found(&e) => continue,
770 Err(e) => return Err(sqlite_err(e)),
771 };
772 resolved.push((id, type_id, name.clone()));
773 }
774
775 if resolved.is_empty() {
776 return Ok(());
777 }
778
779 let ids: Vec<i64> = resolved.iter().map(|(id, _, _)| *id).collect();
780 let n = ids.len();
781
782 let obs_p: Vec<String> = (0..n).map(|i| format!("?{}", i + 1)).collect();
784 let obs_sql = format!(
785 "DELETE FROM observation WHERE entity_id IN ({})",
786 obs_p.join(",")
787 );
788 let obs_refs: Vec<&dyn ToSql> = ids.iter().map(|id| id as &dyn ToSql).collect();
789 let obs_deleted = conn
790 .execute(&obs_sql, obs_refs.as_slice())
791 .map_err(sqlite_err)? as i64;
792
793 let rel_sql = format!(
795 "DELETE FROM relation WHERE from_id IN ({}) OR to_id IN ({})",
796 obs_p.join(","),
797 obs_p.join(",")
798 );
799 let rel_refs: Vec<&dyn ToSql> = ids.iter().map(|id| id as &dyn ToSql).collect();
800 let rel_deleted = conn
801 .execute(&rel_sql, rel_refs.as_slice())
802 .map_err(sqlite_err)? as i64;
803
804 let fts_values: Vec<String> = (0..n)
806 .map(|_| "('delete', ?, '')".to_string())
807 .collect();
808 let fts_sql = format!(
809 "INSERT INTO name_fts(name_fts, rowid, name) VALUES {}",
810 fts_values.join(", ")
811 );
812 conn.execute(&fts_sql, rusqlite::params_from_iter(&ids))
813 .map_err(sqlite_err)?;
814
815 let mut type_deltas: FxHashMap<i64, i64> = FxHashMap::default();
817 for &(_, type_id, _) in &resolved {
818 *type_deltas.entry(type_id).or_insert(0) += 1;
819 }
820
821 if !type_deltas.is_empty() {
823 let m = type_deltas.len();
824 let type_keys: Vec<i64> = type_deltas.keys().cloned().collect();
825 let type_vals: Vec<i64> = type_deltas.values().map(|v| -*v).collect();
826 let mut case_parts: Vec<String> = Vec::with_capacity(m);
827 let mut id_parts: Vec<String> = Vec::with_capacity(m);
828 for i in 0..m {
829 case_parts.push(format!("WHEN ?{} THEN ?{}", i + 1, m + i + 1));
830 id_parts.push(format!("?{}", i + 1));
831 }
832 let sql = format!(
833 "UPDATE type_dict SET count = MAX(0, count + CASE id {} ELSE 0 END) WHERE id IN ({})",
834 case_parts.join(" "),
835 id_parts.join(","),
836 );
837 let mut params: Vec<Box<dyn ToSql>> = Vec::with_capacity(2 * m);
838 for id in &type_keys {
839 params.push(Box::new(*id));
840 }
841 for delta in &type_vals {
842 params.push(Box::new(*delta));
843 }
844 let param_refs: Vec<&dyn ToSql> = params.iter().map(|p| p.as_ref()).collect();
845 conn.execute(&sql, param_refs.as_slice()).map_err(sqlite_err)?;
846 }
847
848 conn.execute(
850 &format!("DELETE FROM entity WHERE id IN ({})", obs_p.join(",")),
851 ids.iter().map(|id| id as &dyn ToSql).collect::<Vec<_>>().as_slice(),
852 )
853 .map_err(sqlite_err)?;
854
855 inc_graph_stat(&conn, "entities", -(n as i64))?;
857 inc_graph_stat(&conn, "observations", -obs_deleted)?;
858 inc_graph_stat(&conn, "relations", -rel_deleted)?;
859
860 for (_, _, name) in &resolved {
862 self.meta_remove(name);
863 }
864
865 Ok(())
866 }
867
868 pub fn create_relations(&self, relations: &[Relation]) -> Result<Vec<Relation>> {
869 let conn = self.writer.lock();
870 let tx = TxGuard::begin(&conn)?;
871
872 let mut ins = conn
873 .prepare_cached(
874 "INSERT INTO relation (from_id, to_id, type_id, created_us)
875 SELECT ?1, ?2, ?3, ?4
876 WHERE NOT EXISTS (SELECT 1 FROM relation WHERE from_id = ?1 AND to_id = ?2 AND type_id = ?3)",
877 )
878 .map_err(sqlite_err)?;
879
880 let ts = now_us();
881 let mut type_cache: FxHashMap<String, i64> = FxHashMap::default();
882 let mut type_deltas: FxHashMap<i64, i64> = FxHashMap::default();
883 let mut out_deltas: FxHashMap<i64, i64> = FxHashMap::default();
884 let mut in_deltas: FxHashMap<i64, i64> = FxHashMap::default();
885 let mut total_relations: i64 = 0;
886 let mut created = Vec::new();
887
888 for rel in relations {
889 let (from_id, _, _, _) = match self.get_entity_id(&conn, &rel.from)? {
890 Some(v) => v,
891 None => continue,
892 };
893 let (to_id, _, _, _) = match self.get_entity_id(&conn, &rel.to)? {
894 Some(v) => v,
895 None => continue,
896 };
897 let type_id = match type_cache.get(rel.relation_type.as_str()) {
898 Some(t) => *t,
899 None => {
900 let t = get_type_id(&conn, &rel.relation_type, 1)?;
901 type_cache.insert(rel.relation_type.clone(), t);
902 t
903 }
904 };
905
906 let changed = ins
907 .execute(params![from_id, to_id, type_id, ts])
908 .map_err(sqlite_err)?;
909 if changed == 0 {
910 continue;
911 }
912
913 *out_deltas.entry(from_id).or_insert(0) += 1;
914 *in_deltas.entry(to_id).or_insert(0) += 1;
915 *type_deltas.entry(type_id).or_insert(0) += 1;
916 total_relations += 1;
917
918 created.push(rel.clone());
919 }
920
921 if total_relations > 0 {
922 for (id, delta) in &out_deltas {
923 conn.execute(
924 "UPDATE entity SET out_deg = out_deg + ?1 WHERE id = ?2",
925 params![delta, id],
926 )
927 .map_err(sqlite_err)?;
928 }
929 for (id, delta) in in_deltas {
930 conn.execute(
931 "UPDATE entity SET in_deg = in_deg + ?1 WHERE id = ?2",
932 params![delta, id],
933 )
934 .map_err(sqlite_err)?;
935 }
936 for (type_id, delta) in &type_deltas {
937 inc_type_count(&conn, *type_id, *delta)?;
938 }
939 inc_graph_stat(&conn, "relations", total_relations)?;
940 }
941
942 tx.commit()?;
943
944 if !created.is_empty() {
947 let mut cache = self.cache.lock();
948 for rel in &created {
949 if let Some(m) = cache.get_mut(&rel.from) {
950 m.out_deg += 1;
951 }
952 if let Some(m) = cache.get_mut(&rel.to) {
953 m.in_deg += 1;
954 }
955 }
956 }
957
958 Ok(created)
959 }
960
961 pub fn delete_relations(&self, relations: &[Relation]) -> Result<()> {
962 if relations.is_empty() {
963 return Ok(());
964 }
965 let conn = self.writer.lock();
966
967 let mut triples: Vec<(i64, i64, i64)> = Vec::with_capacity(relations.len());
969 let mut names: Vec<(String, String)> = Vec::with_capacity(relations.len());
970 for rel in relations {
971 let (from_id, _, _, _) = match self.get_entity_id(&conn, &rel.from)? {
972 Some(v) => v,
973 None => continue,
974 };
975 let (to_id, _, _, _) = match self.get_entity_id(&conn, &rel.to)? {
976 Some(v) => v,
977 None => continue,
978 };
979 let type_id = match get_type_id(&conn, &rel.relation_type, 1) {
980 Ok(id) => id,
981 Err(_) => continue,
982 };
983 triples.push((from_id, to_id, type_id));
984 names.push((rel.from.clone(), rel.to.clone()));
985 }
986
987 if triples.is_empty() {
988 return Ok(());
989 }
990
991 let mut sql = String::from(
993 "DELETE FROM relation WHERE (from_id, to_id, type_id) IN (",
994 );
995 for (i, _) in triples.iter().enumerate() {
996 if i > 0 {
997 sql.push_str(", ");
998 }
999 let base = (i * 3) + 1;
1000 sql.push_str(&format!("SELECT ?{b}, ?{bp1}, ?{bp2}", b = base, bp1 = base + 1, bp2 = base + 2));
1001 }
1002 sql.push(')');
1003
1004 let mut param_values: Vec<Box<dyn ToSql>> = Vec::with_capacity(triples.len() * 3);
1005 for &(f, t, tp) in &triples {
1006 param_values.push(Box::new(f));
1007 param_values.push(Box::new(t));
1008 param_values.push(Box::new(tp));
1009 }
1010 let param_refs: Vec<&dyn ToSql> = param_values.iter().map(|p| p.as_ref()).collect();
1011 let total = conn.execute(&sql, param_refs.as_slice()).map_err(sqlite_err)?;
1012 if total == 0 {
1013 return Ok(());
1014 }
1015
1016 let mut out_deltas: FxHashMap<i64, i64> = FxHashMap::default();
1018 let mut in_deltas: FxHashMap<i64, i64> = FxHashMap::default();
1019 let mut type_deltas: FxHashMap<i64, i64> = FxHashMap::default();
1020 for &(from_id, to_id, type_id) in &triples {
1021 *out_deltas.entry(from_id).or_insert(0) += 1;
1022 *in_deltas.entry(to_id).or_insert(0) += 1;
1023 *type_deltas.entry(type_id).or_insert(0) += 1;
1024 }
1025
1026 let out_keys: Vec<i64> = out_deltas.keys().cloned().collect();
1028 let out_vals: Vec<i64> = out_deltas.values().cloned().collect();
1029 if !out_keys.is_empty() {
1030 let m = out_keys.len();
1031 let mut case_parts: Vec<String> = Vec::with_capacity(m);
1032 let mut id_parts: Vec<String> = Vec::with_capacity(m);
1033 for i in 0..m {
1034 case_parts.push(format!("WHEN ?{} THEN ?{}", i + 1, m + i + 1));
1035 id_parts.push(format!("?{}", i + 1));
1036 }
1037 let sql = format!(
1038 "UPDATE entity SET out_deg = MAX(0, out_deg - CASE id {} ELSE 0 END) WHERE id IN ({})",
1039 case_parts.join(" "),
1040 id_parts.join(","),
1041 );
1042 let mut params: Vec<Box<dyn ToSql>> = Vec::with_capacity(2 * m);
1043 for id in &out_keys {
1044 params.push(Box::new(*id));
1045 }
1046 for delta in &out_vals {
1047 params.push(Box::new(*delta));
1048 }
1049 let param_refs: Vec<&dyn ToSql> = params.iter().map(|p| p.as_ref()).collect();
1050 conn.execute(&sql, param_refs.as_slice()).map_err(sqlite_err)?;
1051 }
1052
1053 let in_keys: Vec<i64> = in_deltas.keys().cloned().collect();
1055 let in_vals: Vec<i64> = in_deltas.values().cloned().collect();
1056 if !in_keys.is_empty() {
1057 let m = in_keys.len();
1058 let mut case_parts: Vec<String> = Vec::with_capacity(m);
1059 let mut id_parts: Vec<String> = Vec::with_capacity(m);
1060 for i in 0..m {
1061 case_parts.push(format!("WHEN ?{} THEN ?{}", i + 1, m + i + 1));
1062 id_parts.push(format!("?{}", i + 1));
1063 }
1064 let sql = format!(
1065 "UPDATE entity SET in_deg = MAX(0, in_deg - CASE id {} ELSE 0 END) WHERE id IN ({})",
1066 case_parts.join(" "),
1067 id_parts.join(","),
1068 );
1069 let mut params: Vec<Box<dyn ToSql>> = Vec::with_capacity(2 * m);
1070 for id in &in_keys {
1071 params.push(Box::new(*id));
1072 }
1073 for delta in &in_vals {
1074 params.push(Box::new(*delta));
1075 }
1076 let param_refs: Vec<&dyn ToSql> = params.iter().map(|p| p.as_ref()).collect();
1077 conn.execute(&sql, param_refs.as_slice()).map_err(sqlite_err)?;
1078 }
1079
1080 let type_keys: Vec<i64> = type_deltas.keys().cloned().collect();
1082 let type_vals: Vec<i64> = type_deltas.values().cloned().collect();
1083 if !type_keys.is_empty() {
1084 let m = type_keys.len();
1085 let mut case_parts: Vec<String> = Vec::with_capacity(m);
1086 let mut id_parts: Vec<String> = Vec::with_capacity(m);
1087 for i in 0..m {
1088 case_parts.push(format!("WHEN ?{} THEN ?{}", i + 1, m + i + 1));
1089 id_parts.push(format!("?{}", i + 1));
1090 }
1091 let sql = format!(
1092 "UPDATE type_dict SET count = MAX(0, count - CASE id {} ELSE 0 END) WHERE id IN ({})",
1093 case_parts.join(" "),
1094 id_parts.join(","),
1095 );
1096 let mut params: Vec<Box<dyn ToSql>> = Vec::with_capacity(2 * m);
1097 for id in &type_keys {
1098 params.push(Box::new(*id));
1099 }
1100 for delta in &type_vals {
1101 params.push(Box::new(*delta));
1102 }
1103 let param_refs: Vec<&dyn ToSql> = params.iter().map(|p| p.as_ref()).collect();
1104 conn.execute(&sql, param_refs.as_slice()).map_err(sqlite_err)?;
1105 }
1106
1107 inc_graph_stat(&conn, "relations", -(total as i64))?;
1108
1109 for (from, to) in &names {
1112 self.meta_update(from, |m| m.out_deg = m.out_deg.saturating_sub(1));
1113 self.meta_update(to, |m| m.in_deg = m.in_deg.saturating_sub(1));
1114 }
1115
1116 Ok(())
1117 }
1118
1119 pub fn add_observations(&self, entity_name: &str, contents: &[String]) -> Result<Vec<String>> {
1120 let conn = self.writer.lock();
1121 let (id, _type_id, _, _) = match self.get_entity_id(&conn, entity_name)? {
1122 Some(v) => v,
1123 None => {
1124 return Err(MCSError::InvalidParams(format!(
1125 "Entity '{entity_name}' not found"
1126 )))
1127 }
1128 };
1129
1130 let mut max_idx: i64 = conn
1131 .query_row(
1132 "SELECT COALESCE(MAX(idx), -1) FROM observation WHERE entity_id = ?1",
1133 params![id],
1134 |row| row.get(0),
1135 )
1136 .map_err(sqlite_err)?;
1137
1138 let ts = now_us();
1139 let mut ins_obs = conn
1140 .prepare_cached(
1141 "INSERT INTO observation (id, entity_id, idx, body, created_us) VALUES (?1, ?2, ?3, ?4, ?5)",
1142 )
1143 .map_err(sqlite_err)?;
1144
1145 for content in contents {
1146 max_idx += 1;
1147 let oid = self.next_obs_id();
1148 ins_obs
1149 .execute(params![oid, id, max_idx, content, ts])
1150 .map_err(sqlite_err)?;
1151 }
1152 let added = contents.to_vec();
1153
1154 let count: i64 = contents.len() as i64;
1155 conn.execute(
1156 "UPDATE entity SET obs_count = obs_count + ?1, updated_us = ?2 WHERE id = ?3",
1157 params![count, ts, id],
1158 )
1159 .map_err(sqlite_err)?;
1160
1161 inc_graph_stat(&conn, "observations", count)?;
1162 self.sync_seqs(&conn)?;
1163
1164 self.meta_update(entity_name, |m| m.obs_count += count);
1165
1166 Ok(added)
1167 }
1168
1169 pub fn delete_observations(&self, entity_name: &str, observations: &[String]) -> Result<()> {
1170 if observations.is_empty() {
1171 return Ok(());
1172 }
1173 let conn = self.writer.lock();
1174 let (id, _, _, _) = match self.get_entity_id(&conn, entity_name)? {
1175 Some(v) => v,
1176 None => {
1177 return Err(MCSError::InvalidParams(format!(
1178 "Entity '{entity_name}' not found"
1179 )))
1180 }
1181 };
1182
1183 let placeholders: Vec<String> = (0..observations.len())
1184 .map(|i| format!("?{}", i + 2))
1185 .collect();
1186 let sql = format!(
1187 "DELETE FROM observation WHERE entity_id = ?1 AND body IN ({})",
1188 placeholders.join(",")
1189 );
1190
1191 let mut param_values: Vec<Box<dyn ToSql>> = Vec::with_capacity(1 + observations.len());
1192 param_values.push(Box::new(id));
1193 for obs in observations {
1194 param_values.push(Box::new(obs.as_str()));
1195 }
1196 let param_refs: Vec<&dyn ToSql> = param_values.iter().map(|p| p.as_ref()).collect();
1197 let removed = conn.execute(&sql, param_refs.as_slice()).map_err(sqlite_err)? as i64;
1198
1199 if removed > 0 {
1200 conn.execute(
1201 "UPDATE entity SET obs_count = MAX(0, obs_count - ?1), updated_us = ?2 WHERE id = ?3",
1202 params![removed, now_us(), id],
1203 )
1204 .map_err(sqlite_err)?;
1205 inc_graph_stat(&conn, "observations", -removed)?;
1206
1207 self.meta_update(entity_name, |m| m.obs_count = m.obs_count.saturating_sub(removed));
1208 }
1209
1210 Ok(())
1211 }
1212
1213 pub fn upsert_entities(&self, entities: &[Entity]) -> Result<Vec<Entity>> {
1214 let mut results = Vec::new();
1215 for entity in entities {
1216 if let Some(existing) = self.get_entity(&entity.name)? {
1217 if existing.entity_type != entity.entity_type {
1219 let conn = self.writer.lock();
1220 let old_type_id = conn
1221 .query_row(
1222 "SELECT type_id FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
1223 params![name_hash(&entity.name), entity.name],
1224 |row| row.get::<_, i64>(0),
1225 )
1226 .map_err(sqlite_err)?;
1227 let new_type_id = get_type_id(&conn, &entity.entity_type, 0)?;
1228 inc_type_count(&conn, old_type_id, -1)?;
1229 inc_type_count(&conn, new_type_id, 1)?;
1230 conn.execute(
1231 "UPDATE entity SET type_id = ?1, updated_us = ?2 WHERE name_hash = ?3 AND name = ?4",
1232 params![new_type_id, now_us(), name_hash(&entity.name), entity.name],
1233 )
1234 .map_err(sqlite_err)?;
1235 self.meta_remove(&entity.name);
1237 }
1238 let existing_set: HashSet<&str> =
1240 existing.observations.iter().map(|s| s.as_str()).collect();
1241 let to_add: Vec<String> = entity
1242 .observations
1243 .iter()
1244 .filter(|o| !existing_set.contains(o.as_str()))
1245 .cloned()
1246 .collect();
1247 if !to_add.is_empty() {
1248 self.add_observations(&entity.name, &to_add)?;
1249 }
1250 let updated = self
1251 .get_entity(&entity.name)?
1252 .unwrap_or(entity.clone());
1253 results.push(updated);
1254 } else {
1255 let c = self.create_entities(&[entity.clone()])?;
1256 if let Some(e) = c.into_iter().next() {
1257 results.push(e);
1258 }
1259 }
1260 }
1261 Ok(results)
1262 }
1263
1264 pub fn merge_entities(&self, source: &str, target: &str) -> Result<Entity> {
1265 let conn = self.writer.lock();
1266 let (src_id, _, _, _) = match self.get_entity_id(&conn, source)? {
1267 Some(v) => v,
1268 None => {
1269 return Err(MCSError::InvalidParams(format!(
1270 "Source entity '{source}' not found"
1271 )))
1272 }
1273 };
1274 let (tgt_id, _, _, _) = match self.get_entity_id(&conn, target)? {
1275 Some(v) => v,
1276 None => {
1277 return Err(MCSError::InvalidParams(format!(
1278 "Target entity '{target}' not found"
1279 )))
1280 }
1281 };
1282
1283 if src_id == tgt_id {
1284 return self.get_entity(target)?.ok_or_else(|| {
1285 MCSError::InvalidParams("Target entity not found after merge".into())
1286 });
1287 }
1288
1289 let mut obs_count: i64 = 0;
1291 {
1292 let mut max_idx: i64 = conn
1293 .query_row(
1294 "SELECT COALESCE(MAX(idx), -1) FROM observation WHERE entity_id = ?1",
1295 params![tgt_id],
1296 |row| row.get(0),
1297 )
1298 .map_err(sqlite_err)?;
1299 let mut sel_obs = conn
1300 .prepare_cached(
1301 "SELECT id, body FROM observation WHERE entity_id = ?1 ORDER BY idx",
1302 )
1303 .map_err(sqlite_err)?;
1304 let mut upd_obs = conn
1305 .prepare_cached("UPDATE observation SET entity_id = ?1, idx = ?2 WHERE id = ?3")
1306 .map_err(sqlite_err)?;
1307 let rows: Vec<(i64, String)> = sel_obs
1308 .query_map(params![src_id], |row| {
1309 Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?))
1310 })
1311 .map_err(sqlite_err)?
1312 .filter_map(|r| r.ok())
1313 .collect();
1314 for (oid, _body) in &rows {
1315 max_idx += 1;
1316 upd_obs
1317 .execute(params![tgt_id, max_idx, oid])
1318 .map_err(sqlite_err)?;
1319 obs_count += 1;
1320 }
1321 }
1322
1323 conn.execute(
1325 "UPDATE OR IGNORE relation SET from_id = ?1 WHERE from_id = ?2",
1326 params![tgt_id, src_id],
1327 )
1328 .map_err(sqlite_err)?;
1329 conn.execute(
1330 "UPDATE OR IGNORE relation SET to_id = ?1 WHERE to_id = ?2",
1331 params![tgt_id, src_id],
1332 )
1333 .map_err(sqlite_err)?;
1334 conn.execute("DELETE FROM relation WHERE from_id = ?1", params![src_id])
1338 .map_err(sqlite_err)?;
1339 conn.execute("DELETE FROM relation WHERE to_id = ?1", params![src_id])
1340 .map_err(sqlite_err)?;
1341
1342 let out_add: i64 = conn
1344 .query_row(
1345 "SELECT COUNT(*) FROM relation WHERE from_id = ?1",
1346 params![tgt_id],
1347 |row| row.get(0),
1348 )
1349 .map_err(sqlite_err)?;
1350 let in_add: i64 = conn
1351 .query_row(
1352 "SELECT COUNT(*) FROM relation WHERE to_id = ?1",
1353 params![tgt_id],
1354 |row| row.get(0),
1355 )
1356 .map_err(sqlite_err)?;
1357 conn.execute(
1358 "UPDATE entity SET out_deg = ?1, in_deg = ?2, obs_count = obs_count + ?3, updated_us = ?4 WHERE id = ?5",
1359 params![out_add, in_add, obs_count, now_us(), tgt_id],
1360 )
1361 .map_err(sqlite_err)?;
1362
1363 conn.execute(
1365 "INSERT INTO name_fts(name_fts, rowid, name) VALUES ('delete', ?1, '')",
1366 params![src_id],
1367 )
1368 .map_err(sqlite_err)?;
1369 conn.execute("DELETE FROM entity WHERE id = ?1", params![src_id])
1370 .map_err(sqlite_err)?;
1371
1372 inc_graph_stat(&conn, "entities", -1)?;
1373 self.meta_remove(source);
1374
1375 if let Ok(meta) = conn.query_row(
1377 "SELECT id, type_id, obs_count, out_deg, in_deg FROM entity WHERE id = ?1",
1378 params![tgt_id],
1379 |row| {
1380 Ok(EntityMeta {
1381 id: row.get(0)?,
1382 type_id: row.get(1)?,
1383 obs_count: row.get(2)?,
1384 out_deg: row.get(3)?,
1385 in_deg: row.get(4)?,
1386 })
1387 },
1388 ) {
1389 self.meta_set(target, meta);
1390 }
1391
1392 let (name, etype): (String, String) = conn
1393 .query_row(
1394 "SELECT e.name, t.name FROM entity e JOIN type_dict t ON t.id = e.type_id WHERE e.id = ?1",
1395 params![tgt_id],
1396 |row| Ok((row.get(0)?, row.get(1)?)),
1397 )
1398 .map_err(sqlite_err)?;
1399 let observations = load_observations_opt(&conn, tgt_id);
1400
1401 Ok(Entity {
1402 name,
1403 entity_type: etype,
1404 observations,
1405 })
1406 }
1407
1408 pub fn search_nodes_filtered(
1409 &self,
1410 query: &str,
1411 filter_type: Option<&str>,
1412 offset: usize,
1413 limit: usize,
1414 ) -> Vec<Entity> {
1415 if query.is_empty() {
1416 return Vec::new();
1417 }
1418 let conn = self.readers.get();
1419
1420 let mut entity_ids: Vec<i64> = Vec::new();
1422 let mut seen: HashSet<i64> = HashSet::new();
1423
1424 if let Ok(mut stmt) = conn.prepare(
1425 "SELECT rowid FROM name_fts WHERE name_fts MATCH ?1 ORDER BY rank LIMIT ?2",
1426 ) {
1427 let limit_i64 = (limit + offset) as i64;
1428 if let Ok(rows) = stmt.query_map(params![query, limit_i64], |row| {
1429 row.get::<_, i64>(0)
1430 }) {
1431 for row in rows.flatten() {
1432 if seen.insert(row) {
1433 entity_ids.push(row);
1434 }
1435 }
1436 }
1437 }
1438
1439 if let Ok(mut stmt) = conn.prepare(
1440 "SELECT entity_id FROM obs_fts JOIN observation ON obs_fts.rowid = observation.id
1441 WHERE obs_fts MATCH ?1
1442 GROUP BY entity_id
1443 LIMIT ?2",
1444 ) {
1445 let limit_i64 = (limit + offset) as i64;
1446 if let Ok(rows) = stmt.query_map(params![query, limit_i64], |row| {
1447 row.get::<_, i64>(0)
1448 }) {
1449 for row in rows.flatten() {
1450 if seen.insert(row) {
1451 entity_ids.push(row);
1452 }
1453 }
1454 }
1455 }
1456
1457 let mut results = Vec::new();
1459 let mut count: usize = 0;
1460 for eid in entity_ids {
1461 if let Ok(entity) = entity_by_id(&conn, eid) {
1462 if let Some(ft) = filter_type {
1463 if !ft.is_empty() && entity.entity_type != ft {
1464 continue;
1465 }
1466 }
1467 if count < offset {
1468 count += 1;
1469 continue;
1470 }
1471 if results.len() >= limit {
1472 break;
1473 }
1474 results.push(entity);
1475 count += 1;
1476 }
1477 }
1478
1479 results
1480 }
1481
1482 pub fn read_graph_filtered(
1483 &self,
1484 filter_type: Option<&str>,
1485 offset: usize,
1486 limit: usize,
1487 ) -> Result<String> {
1488 let conn = self.readers.get();
1489
1490 let limit_sql: i64 = if limit == usize::MAX {
1491 -1
1492 } else {
1493 limit.min(i64::MAX as usize) as i64
1494 };
1495 let offset_sql: i64 = offset as i64;
1496
1497 let filter = filter_type.filter(|ft| !ft.is_empty());
1503 let ids: Vec<i64> = if let Some(ft) = filter {
1504 let mut stmt = conn
1505 .prepare_cached(
1506 "SELECT e.id FROM entity e
1507 WHERE e.type_id = (SELECT id FROM type_dict WHERE kind = 0 AND name = ?1)
1508 AND e.flags = 0
1509 ORDER BY e.id LIMIT ?2 OFFSET ?3",
1510 )
1511 .map_err(sqlite_err)?;
1512 stmt.query_map(params![ft, limit_sql, offset_sql], |r| r.get::<_, i64>(0))
1513 .map_err(sqlite_err)?
1514 .filter_map(|r| r.ok())
1515 .collect()
1516 } else {
1517 let mut stmt = conn
1518 .prepare_cached(
1519 "SELECT e.id FROM entity e WHERE e.flags = 0
1520 ORDER BY e.id LIMIT ?1 OFFSET ?2",
1521 )
1522 .map_err(sqlite_err)?;
1523 stmt.query_map(params![limit_sql, offset_sql], |r| r.get::<_, i64>(0))
1524 .map_err(sqlite_err)?
1525 .filter_map(|r| r.ok())
1526 .collect()
1527 };
1528
1529 if ids.is_empty() {
1530 return Ok(r#"{"entities":[],"relations":[]}"#.to_string());
1531 }
1532
1533 let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1534
1535 let entities_json: String = {
1536 let sql = format!(
1537 "SELECT COALESCE(json_group_array(json_object(
1538 'name', e.name,
1539 'entityType', t.name,
1540 'observations', COALESCE((
1541 SELECT json_group_array(o.body ORDER BY o.idx)
1542 FROM observation o WHERE o.entity_id = e.id
1543 ), json('[]'))
1544 ) ORDER BY e.id), json('[]'))
1545 FROM entity e
1546 JOIN type_dict t ON t.id = e.type_id
1547 WHERE e.id IN ({placeholders}) AND e.flags = 0"
1548 );
1549 conn.query_row(&sql, rusqlite::params_from_iter(&ids), |row| {
1550 row.get::<_, String>(0)
1551 })
1552 .map_err(sqlite_err)?
1553 };
1554
1555 let relations_json: String = {
1556 let sql = format!(
1557 "SELECT COALESCE(json_group_array(json_object(
1558 'from', e1.name,
1559 'to', e2.name,
1560 'relationType', t.name
1561 )), json('[]'))
1562 FROM relation r
1563 JOIN entity e1 ON e1.id = r.from_id
1564 JOIN entity e2 ON e2.id = r.to_id
1565 JOIN type_dict t ON t.id = r.type_id
1566 WHERE r.from_id IN ({placeholders}) AND r.to_id IN ({placeholders})
1567 AND e1.flags = 0 AND e2.flags = 0"
1568 );
1569 let all_params: Vec<&dyn ToSql> = ids
1570 .iter()
1571 .map(|id| id as &dyn ToSql)
1572 .chain(ids.iter().map(|id| id as &dyn ToSql))
1573 .collect();
1574 conn.query_row(&sql, all_params.as_slice(), |row| row.get::<_, String>(0))
1575 .map_err(sqlite_err)?
1576 };
1577
1578 let mut out = String::with_capacity(32 + entities_json.len() + relations_json.len());
1579 out.push_str("{\"entities\":");
1580 out.push_str(&entities_json);
1581 out.push_str(",\"relations\":");
1582 out.push_str(&relations_json);
1583 out.push('}');
1584 Ok(out)
1585 }
1586
1587 pub fn open_nodes(&self, names: &[String]) -> String {
1588 let conn = self.readers.get();
1589 let mut entity_ids: Vec<i64> = Vec::new();
1590
1591 for name in names {
1592 let h = name_hash(name);
1593 if let Ok(Some(id)) = conn
1594 .query_row(
1595 "SELECT id FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
1596 params![h, name],
1597 |row| row.get::<_, i64>(0),
1598 )
1599 .map(Some)
1600 .or_else(|e| if is_not_found(&e) { Ok(None) } else { Err(sqlite_err(e)) })
1601 {
1602 entity_ids.push(id);
1603 }
1604 }
1605
1606 if entity_ids.is_empty() {
1607 return r#"{"entities":[],"relations":[]}"#.to_string();
1608 }
1609
1610 let placeholders: Vec<String> = entity_ids.iter().map(|_| "?".to_string()).collect();
1611 let ids_str = placeholders.join(",");
1612
1613 let entities_json: String = {
1614 let sql = format!(
1615 "SELECT COALESCE(json_group_array(json_object(
1616 'name', e.name,
1617 'entityType', t.name,
1618 'observations', COALESCE((
1619 SELECT json_group_array(o.body ORDER BY o.idx)
1620 FROM observation o WHERE o.entity_id = e.id
1621 ), json('[]'))
1622 ) ORDER BY e.id), json('[]'))
1623 FROM entity e
1624 JOIN type_dict t ON t.id = e.type_id
1625 WHERE e.id IN ({ids_str}) AND e.flags = 0"
1626 );
1627 conn.query_row(&sql, rusqlite::params_from_iter(&entity_ids), |row| {
1628 row.get::<_, String>(0)
1629 })
1630 .unwrap_or_else(|_| "[]".to_string())
1631 };
1632
1633 let relations_json: String = {
1634 let sql = format!(
1635 "SELECT COALESCE(json_group_array(json_object(
1636 'from', e1.name,
1637 'to', e2.name,
1638 'relationType', t.name
1639 )), json('[]'))
1640 FROM relation r
1641 JOIN entity e1 ON e1.id = r.from_id
1642 JOIN entity e2 ON e2.id = r.to_id
1643 JOIN type_dict t ON t.id = r.type_id
1644 WHERE (r.from_id IN ({ids_str}) OR r.to_id IN ({ids_str}))
1645 AND e1.flags = 0 AND e2.flags = 0"
1646 );
1647 let all_params: Vec<&dyn rusqlite::types::ToSql> = entity_ids
1648 .iter()
1649 .map(|id| id as &dyn rusqlite::types::ToSql)
1650 .chain(entity_ids.iter().map(|id| id as &dyn rusqlite::types::ToSql))
1651 .collect();
1652 let mut stmt = conn.prepare(&sql).unwrap();
1653 stmt.query_row(all_params.as_slice(), |row| row.get::<_, String>(0))
1654 .unwrap_or_else(|_| "[]".to_string())
1655 };
1656
1657 let mut out = String::with_capacity(32 + entities_json.len() + relations_json.len());
1658 out.push_str("{\"entities\":");
1659 out.push_str(&entities_json);
1660 out.push_str(",\"relations\":");
1661 out.push_str(&relations_json);
1662 out.push('}');
1663 out
1664 }
1665
1666 pub fn entities_exist(&self, names: &[String]) -> Result<Vec<bool>> {
1667 let conn = self.readers.get();
1668 let mut results = Vec::with_capacity(names.len());
1669 for name in names {
1670 let h = name_hash(name);
1671 let exists: bool = conn
1672 .query_row(
1673 "SELECT 1 FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
1674 params![h, name],
1675 |_| Ok(()),
1676 )
1677 .is_ok();
1678 results.push(exists);
1679 }
1680 Ok(results)
1681 }
1682
1683 pub fn degree(&self, name: &str, direction: Direction) -> Result<usize> {
1684 let conn = self.readers.get();
1685 let (_, _, out_d, in_d) = match self.get_entity_id(&conn, name)? {
1686 Some(v) => v,
1687 None => {
1688 return Err(MCSError::InvalidParams(format!(
1689 "Entity '{name}' not found"
1690 )))
1691 }
1692 };
1693 Ok(match direction {
1694 Direction::Outgoing => out_d as usize,
1695 Direction::Incoming => in_d as usize,
1696 Direction::Both => (out_d + in_d) as usize,
1697 })
1698 }
1699
1700 pub fn get_entity_count(&self) -> Result<usize> {
1701 let conn = self.readers.get();
1702 read_graph_stat(&conn, "entities")
1703 .map(|v| v as usize)
1704 .map_err(|_| MCSError::MemoryError("Failed to read entity count".into()))
1705 }
1706
1707 pub fn get_relation_count(&self) -> Result<usize> {
1708 let conn = self.readers.get();
1709 read_graph_stat(&conn, "relations")
1710 .map(|v| v as usize)
1711 .map_err(|_| MCSError::MemoryError("Failed to read relation count".into()))
1712 }
1713
1714 pub fn search_relations(
1715 &self,
1716 from: Option<&str>,
1717 to: Option<&str>,
1718 rtype: Option<&str>,
1719 ) -> Vec<Relation> {
1720 let conn = self.readers.get();
1721 let mut results = Vec::new();
1722
1723 let from_id = from
1729 .filter(|f| !f.is_empty())
1730 .map(|f| entity_name_lookup(&conn, f).ok().flatten().unwrap_or(-1));
1731 let to_id = to
1732 .filter(|t| !t.is_empty())
1733 .map(|t| entity_name_lookup(&conn, t).ok().flatten().unwrap_or(-1));
1734 let type_id = rtype
1735 .filter(|rt| !rt.is_empty())
1736 .map(|rt| lookup_type_id(&conn, rt, 1).unwrap_or(-1));
1737
1738 match (from_id, to_id, type_id) {
1739 (Some(fid), Some(tid), Some(tpid)) => {
1740 if let Ok(mut stmt) = conn.prepare_cached(
1741 "SELECT e1.name, e2.name, t.name
1742 FROM relation r
1743 JOIN entity e1 ON e1.id = r.from_id
1744 JOIN entity e2 ON e2.id = r.to_id
1745 JOIN type_dict t ON t.id = r.type_id
1746 WHERE r.from_id = ?1 AND r.to_id = ?2 AND r.type_id = ?3
1747 AND e1.flags = 0 AND e2.flags = 0
1748 ORDER BY r.from_id, r.to_id"
1749 ) {
1750 if let Ok(rows) = stmt.query_map(params![fid, tid, tpid], |row| {
1751 Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1752 }) {
1753 for row in rows.flatten() { results.push(row); }
1754 }
1755 }
1756 }
1757 (Some(fid), Some(tid), None) => {
1758 if let Ok(mut stmt) = conn.prepare_cached(
1759 "SELECT e1.name, e2.name, t.name
1760 FROM relation r
1761 JOIN entity e1 ON e1.id = r.from_id
1762 JOIN entity e2 ON e2.id = r.to_id
1763 JOIN type_dict t ON t.id = r.type_id
1764 WHERE r.from_id = ?1 AND r.to_id = ?2
1765 AND e1.flags = 0 AND e2.flags = 0
1766 ORDER BY r.from_id, r.to_id"
1767 ) {
1768 if let Ok(rows) = stmt.query_map(params![fid, tid], |row| {
1769 Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1770 }) {
1771 for row in rows.flatten() { results.push(row); }
1772 }
1773 }
1774 }
1775 (Some(fid), None, Some(tpid)) => {
1776 if let Ok(mut stmt) = conn.prepare_cached(
1777 "SELECT e1.name, e2.name, t.name
1778 FROM relation r
1779 JOIN entity e1 ON e1.id = r.from_id
1780 JOIN entity e2 ON e2.id = r.to_id
1781 JOIN type_dict t ON t.id = r.type_id
1782 WHERE r.from_id = ?1 AND r.type_id = ?2
1783 AND e1.flags = 0 AND e2.flags = 0
1784 ORDER BY r.from_id, r.to_id"
1785 ) {
1786 if let Ok(rows) = stmt.query_map(params![fid, tpid], |row| {
1787 Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1788 }) {
1789 for row in rows.flatten() { results.push(row); }
1790 }
1791 }
1792 }
1793 (None, Some(tid), Some(tpid)) => {
1794 if let Ok(mut stmt) = conn.prepare_cached(
1795 "SELECT e1.name, e2.name, t.name
1796 FROM relation r
1797 JOIN entity e1 ON e1.id = r.from_id
1798 JOIN entity e2 ON e2.id = r.to_id
1799 JOIN type_dict t ON t.id = r.type_id
1800 WHERE r.to_id = ?1 AND r.type_id = ?2
1801 AND e1.flags = 0 AND e2.flags = 0
1802 ORDER BY r.from_id, r.to_id"
1803 ) {
1804 if let Ok(rows) = stmt.query_map(params![tid, tpid], |row| {
1805 Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1806 }) {
1807 for row in rows.flatten() { results.push(row); }
1808 }
1809 }
1810 }
1811 (Some(fid), None, None) => {
1812 if let Ok(mut stmt) = conn.prepare_cached(
1813 "SELECT e1.name, e2.name, t.name
1814 FROM relation r
1815 JOIN entity e1 ON e1.id = r.from_id
1816 JOIN entity e2 ON e2.id = r.to_id
1817 JOIN type_dict t ON t.id = r.type_id
1818 WHERE r.from_id = ?1
1819 AND e1.flags = 0 AND e2.flags = 0
1820 ORDER BY r.from_id, r.to_id"
1821 ) {
1822 if let Ok(rows) = stmt.query_map(params![fid], |row| {
1823 Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1824 }) {
1825 for row in rows.flatten() { results.push(row); }
1826 }
1827 }
1828 }
1829 (None, Some(tid), None) => {
1830 if let Ok(mut stmt) = conn.prepare_cached(
1831 "SELECT e1.name, e2.name, t.name
1832 FROM relation r
1833 JOIN entity e1 ON e1.id = r.from_id
1834 JOIN entity e2 ON e2.id = r.to_id
1835 JOIN type_dict t ON t.id = r.type_id
1836 WHERE r.to_id = ?1
1837 AND e1.flags = 0 AND e2.flags = 0
1838 ORDER BY r.from_id, r.to_id"
1839 ) {
1840 if let Ok(rows) = stmt.query_map(params![tid], |row| {
1841 Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1842 }) {
1843 for row in rows.flatten() { results.push(row); }
1844 }
1845 }
1846 }
1847 (None, None, Some(tpid)) => {
1848 if let Ok(mut stmt) = conn.prepare_cached(
1849 "SELECT e1.name, e2.name, t.name
1850 FROM relation r
1851 JOIN entity e1 ON e1.id = r.from_id
1852 JOIN entity e2 ON e2.id = r.to_id
1853 JOIN type_dict t ON t.id = r.type_id
1854 WHERE r.type_id = ?1
1855 AND e1.flags = 0 AND e2.flags = 0
1856 ORDER BY r.from_id, r.to_id"
1857 ) {
1858 if let Ok(rows) = stmt.query_map(params![tpid], |row| {
1859 Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1860 }) {
1861 for row in rows.flatten() { results.push(row); }
1862 }
1863 }
1864 }
1865 (None, None, None) => {
1866 if let Ok(mut stmt) = conn.prepare_cached(
1867 "SELECT e1.name, e2.name, t.name
1868 FROM relation r
1869 JOIN entity e1 ON e1.id = r.from_id
1870 JOIN entity e2 ON e2.id = r.to_id
1871 JOIN type_dict t ON t.id = r.type_id
1872 WHERE e1.flags = 0 AND e2.flags = 0
1873 ORDER BY r.from_id, r.to_id"
1874 ) {
1875 if let Ok(rows) = stmt.query_map([], |row| {
1876 Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1877 }) {
1878 for row in rows.flatten() { results.push(row); }
1879 }
1880 }
1881 }
1882 }
1883 results
1884 }
1885
1886 pub fn find_path(&self, from: &str, to: &str) -> Result<Option<Vec<String>>> {
1887 let conn = self.readers.get();
1888 let (from_id, _, _, _) = match self.get_entity_id(&conn, from)? {
1889 Some(v) => v,
1890 None => {
1891 return Err(MCSError::InvalidParams(format!(
1892 "Source entity '{from}' not found"
1893 )))
1894 }
1895 };
1896 let (to_id, _, _, _) = match self.get_entity_id(&conn, to)? {
1897 Some(v) => v,
1898 None => {
1899 return Err(MCSError::InvalidParams(format!(
1900 "Target entity '{to}' not found"
1901 )))
1902 }
1903 };
1904
1905 if from_id == to_id {
1906 return Ok(Some(vec![from.to_string()]));
1907 }
1908
1909 let mut visited = HashSet::new();
1911 let mut parent: FxHashMap<i64, i64> = FxHashMap::default();
1912 let mut queue = VecDeque::new();
1913 visited.insert(from_id);
1914 queue.push_back(from_id);
1915
1916 while let Some(cur) = queue.pop_front() {
1917 if cur == to_id {
1918 break;
1919 }
1920 if let Ok(mut stmt) =
1922 conn.prepare_cached("SELECT to_id FROM relation WHERE from_id = ?1")
1923 {
1924 if let Ok(rows) = stmt.query_map(params![cur], |row| row.get::<_, i64>(0)) {
1925 for row in rows.flatten() {
1926 if visited.insert(row) {
1927 parent.insert(row, cur);
1928 queue.push_back(row);
1929 }
1930 }
1931 }
1932 }
1933 if let Ok(mut stmt) =
1935 conn.prepare_cached("SELECT from_id FROM relation WHERE to_id = ?1")
1936 {
1937 if let Ok(rows) = stmt.query_map(params![cur], |row| row.get::<_, i64>(0)) {
1938 for row in rows.flatten() {
1939 if visited.insert(row) {
1940 parent.insert(row, cur);
1941 queue.push_back(row);
1942 }
1943 }
1944 }
1945 }
1946 }
1947
1948 if !parent.contains_key(&to_id) && to_id != from_id {
1949 return Ok(None);
1950 }
1951
1952 let mut path = Vec::new();
1953 let mut cur = to_id;
1954 path.push(cur);
1955 while let Some(&p) = parent.get(&cur) {
1956 path.push(p);
1957 cur = p;
1958 if cur == from_id {
1959 break;
1960 }
1961 }
1962 path.reverse();
1963
1964 let mut name_path = Vec::with_capacity(path.len());
1965 for id in path {
1966 if let Ok(name) = conn.query_row(
1967 "SELECT name FROM entity WHERE id = ?1",
1968 params![id],
1969 |row| row.get::<_, String>(0),
1970 ) {
1971 name_path.push(name);
1972 }
1973 }
1974
1975 Ok(Some(name_path))
1976 }
1977
1978 pub fn compact(&self) -> Result<()> {
1979 let conn = self.writer.lock();
1980 conn.execute_batch("PRAGMA incremental_vacuum;").map_err(sqlite_err)?;
1981 Ok(())
1982 }
1983
1984 pub fn neighbors(
1985 &self,
1986 name: &str,
1987 direction: Direction,
1988 rtype: Option<&str>,
1989 depth: u32,
1990 ) -> Result<String> {
1991 self._traverse(name, direction, rtype, depth, true)
1992 }
1993
1994 pub fn extract_subgraph(
1995 &self,
1996 names: &[String],
1997 depth: u32,
1998 ) -> Result<String> {
1999 if names.is_empty() {
2000 return Ok(r#"{"entities":[],"relations":[]}"#.to_string());
2001 }
2002
2003 let conn = self.readers.get();
2004 let mut all_entity_ids: HashSet<i64> = HashSet::new();
2005 let mut frontier: HashSet<i64> = HashSet::new();
2006 let mut all_rel_pairs: HashSet<(i64, i64, i64)> = HashSet::new();
2007
2008 for name in names {
2010 let h = name_hash(name);
2011 if let Ok(Some(id)) = conn
2012 .query_row(
2013 "SELECT id FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
2014 params![h, name],
2015 |row| row.get::<_, i64>(0),
2016 )
2017 .map(Some)
2018 .or_else(|e| if is_not_found(&e) { Ok(None) } else { Err(sqlite_err(e)) })
2019 {
2020 all_entity_ids.insert(id);
2021 frontier.insert(id);
2022 }
2023 }
2024
2025 let mut current_depth = 0u32;
2026 while current_depth < depth && !frontier.is_empty() {
2027 let mut next_frontier: HashSet<i64> = HashSet::new();
2028
2029 for fid in &frontier {
2031 if let Ok(mut stmt) = conn.prepare_cached(
2032 "SELECT from_id, to_id, type_id FROM relation WHERE from_id = ?1",
2033 ) {
2034 if let Ok(rows) =
2035 stmt.query_map(params![fid], |row| {
2036 Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?, row.get::<_, i64>(2)?))
2037 })
2038 {
2039 for row in rows.flatten() {
2040 let (from_id, to_id, type_id) = row;
2041 all_rel_pairs.insert((from_id, to_id, type_id));
2042 if all_entity_ids.insert(to_id) {
2043 next_frontier.insert(to_id);
2044 }
2045 }
2046 }
2047 }
2048 if let Ok(mut stmt) = conn.prepare_cached(
2049 "SELECT from_id, to_id, type_id FROM relation WHERE to_id = ?1",
2050 ) {
2051 if let Ok(rows) =
2052 stmt.query_map(params![fid], |row| {
2053 Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?, row.get::<_, i64>(2)?))
2054 })
2055 {
2056 for row in rows.flatten() {
2057 let (from_id, to_id, type_id) = row;
2058 all_rel_pairs.insert((from_id, to_id, type_id));
2059 if all_entity_ids.insert(from_id) {
2060 next_frontier.insert(from_id);
2061 }
2062 }
2063 }
2064 }
2065 }
2066 frontier = next_frontier;
2067 current_depth += 1;
2068 }
2069
2070 let entities_json: String = {
2071 if all_entity_ids.is_empty() {
2072 "[]".to_string()
2073 } else {
2074 let ids: Vec<i64> = all_entity_ids.iter().copied().collect();
2075 let placeholders: Vec<String> = ids.iter().map(|_| "?".to_string()).collect();
2076 let sql = format!(
2077 "SELECT COALESCE(json_group_array(json_object(
2078 'name', e.name,
2079 'entityType', t.name,
2080 'observations', COALESCE((
2081 SELECT json_group_array(o.body ORDER BY o.idx)
2082 FROM observation o WHERE o.entity_id = e.id
2083 ), json('[]'))
2084 ) ORDER BY e.id), json('[]'))
2085 FROM entity e
2086 JOIN type_dict t ON t.id = e.type_id
2087 WHERE e.id IN ({}) AND e.flags = 0",
2088 placeholders.join(",")
2089 );
2090 conn.query_row(&sql, rusqlite::params_from_iter(&ids), |row| {
2091 row.get::<_, String>(0)
2092 })
2093 .unwrap_or_else(|_| "[]".to_string())
2094 }
2095 };
2096
2097 let relations_json: String = {
2098 if all_rel_pairs.is_empty() {
2099 "[]".to_string()
2100 } else {
2101 let vals: Vec<String> = all_rel_pairs.iter().map(|_| "(?, ?, ?)".to_string()).collect();
2102 let sql = format!(
2103 "WITH r(from_id, to_id, type_id) AS (VALUES {})
2104 SELECT COALESCE(json_group_array(json_object(
2105 'from', e1.name,
2106 'to', e2.name,
2107 'relationType', t.name
2108 )), json('[]'))
2109 FROM r
2110 JOIN entity e1 ON e1.id = r.from_id
2111 JOIN entity e2 ON e2.id = r.to_id
2112 JOIN type_dict t ON t.id = r.type_id
2113 WHERE e1.flags = 0 AND e2.flags = 0",
2114 vals.join(", ")
2115 );
2116 let params: Vec<&dyn ToSql> = all_rel_pairs.iter()
2117 .flat_map(|(f, t, tp)| {
2118 vec![f as &dyn ToSql, t as &dyn ToSql, tp as &dyn ToSql]
2119 })
2120 .collect();
2121 let mut stmt = conn.prepare(&sql).map_err(sqlite_err)?;
2122 stmt.query_row(params.as_slice(), |row| row.get::<_, String>(0))
2123 .unwrap_or_else(|_| "[]".to_string())
2124 }
2125 };
2126
2127 let mut out = String::with_capacity(32 + entities_json.len() + relations_json.len());
2128 out.push_str("{\"entities\":");
2129 out.push_str(&entities_json);
2130 out.push_str(",\"relations\":");
2131 out.push_str(&relations_json);
2132 out.push('}');
2133 Ok(out)
2134 }
2135
2136 pub fn describe_entity(&self, name: &str) -> Result<Entity> {
2137 self.get_entity(name)?
2138 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))
2139 }
2140
2141 pub fn entity_type_counts(&self) -> Vec<(String, usize)> {
2142 let conn = self.readers.get();
2143 select_all_types(&conn, 0).unwrap_or_default()
2144 }
2145
2146 pub fn relation_type_counts(&self) -> Vec<(String, usize)> {
2147 let conn = self.readers.get();
2148 select_all_types(&conn, 1).unwrap_or_default()
2149 }
2150
2151 pub fn batch_get_entities(&self, names: &[String]) -> Vec<Option<Entity>> {
2152 names
2153 .iter()
2154 .map(|n| self.get_entity(n).unwrap_or(None))
2155 .collect()
2156 }
2157
2158 pub fn find_all_paths(
2159 &self,
2160 from: &str,
2161 to: &str,
2162 max_depth: usize,
2163 max_paths: usize,
2164 ) -> Result<Vec<Vec<String>>> {
2165 let conn = self.readers.get();
2166 let (from_id, _, _, _) = match self.get_entity_id(&conn, from)? {
2167 Some(v) => v,
2168 None => {
2169 return Err(MCSError::InvalidParams(format!(
2170 "Source entity '{from}' not found"
2171 )))
2172 }
2173 };
2174 let (to_id, _, _, _) = match self.get_entity_id(&conn, to)? {
2175 Some(v) => v,
2176 None => {
2177 return Err(MCSError::InvalidParams(format!(
2178 "Target entity '{to}' not found"
2179 )))
2180 }
2181 };
2182
2183 if from_id == to_id {
2184 return Ok(vec![vec![from.to_string()]]);
2185 }
2186
2187 let mut all_paths: Vec<Vec<i64>> = Vec::new();
2189 let mut queue: VecDeque<(i64, Vec<i64>)> = VecDeque::new();
2190 queue.push_back((from_id, vec![from_id]));
2191
2192 while let Some((cur, path)) = queue.pop_front() {
2193 if all_paths.len() >= max_paths {
2194 break;
2195 }
2196 if path.len() > max_depth {
2197 continue;
2198 }
2199
2200 if let Ok(mut stmt) =
2202 conn.prepare_cached("SELECT to_id FROM relation WHERE from_id = ?1")
2203 {
2204 if let Ok(rows) = stmt.query_map(params![cur], |row| row.get::<_, i64>(0)) {
2205 for next_id in rows.flatten() {
2206 if next_id == to_id {
2207 let mut full_path = path.clone();
2208 full_path.push(next_id);
2209 all_paths.push(full_path);
2210 if all_paths.len() >= max_paths {
2211 break;
2212 }
2213 } else if !path.contains(&next_id) && path.len() < max_depth {
2214 let mut new_path = path.clone();
2215 new_path.push(next_id);
2216 queue.push_back((next_id, new_path));
2217 }
2218 }
2219 }
2220 }
2221
2222 if let Ok(mut stmt) =
2224 conn.prepare_cached("SELECT from_id FROM relation WHERE to_id = ?1")
2225 {
2226 if let Ok(rows) = stmt.query_map(params![cur], |row| row.get::<_, i64>(0)) {
2227 for next_id in rows.flatten() {
2228 if next_id == to_id {
2229 let mut full_path = path.clone();
2230 full_path.push(next_id);
2231 all_paths.push(full_path);
2232 if all_paths.len() >= max_paths {
2233 break;
2234 }
2235 } else if !path.contains(&next_id) && path.len() < max_depth {
2236 let mut new_path = path.clone();
2237 new_path.push(next_id);
2238 queue.push_back((next_id, new_path));
2239 }
2240 }
2241 }
2242 }
2243 }
2244
2245 let mut named_paths: Vec<Vec<String>> = Vec::new();
2247 for path_ids in all_paths {
2248 let mut named = Vec::with_capacity(path_ids.len());
2249 for id in path_ids {
2250 if let Ok(name) = conn.query_row(
2251 "SELECT name FROM entity WHERE id = ?1",
2252 params![id],
2253 |row| row.get::<_, String>(0),
2254 ) {
2255 named.push(name);
2256 }
2257 }
2258 named_paths.push(named);
2259 }
2260
2261 Ok(named_paths)
2262 }
2263
2264 pub fn export(&self, _format: &str, max_rows: i64) -> Result<String> {
2269 let conn = self.readers.get();
2270 conn.query_row(
2273 "SELECT json_object(
2274 'entities', COALESCE((
2275 SELECT json_group_array(json_object(
2276 'name', e.name,
2277 'entityType', t.name,
2278 'observations', COALESCE((
2279 SELECT json_group_array(o.body ORDER BY o.idx)
2280 FROM observation o WHERE o.entity_id = e.id
2281 ), json('[]'))
2282 ) ORDER BY e.id)
2283 FROM (
2284 SELECT id, name, type_id FROM entity
2285 WHERE flags = 0 ORDER BY id LIMIT ?1
2286 ) e
2287 JOIN type_dict t ON t.id = e.type_id
2288 ), json('[]')),
2289 'relations', COALESCE((
2290 SELECT json_group_array(json_object(
2291 'from', e1.name,
2292 'to', e2.name,
2293 'relationType', t.name
2294 ))
2295 FROM (
2296 SELECT from_id, to_id, type_id FROM relation LIMIT ?1
2297 ) r
2298 JOIN entity e1 ON e1.id = r.from_id
2299 JOIN entity e2 ON e2.id = r.to_id
2300 JOIN type_dict t ON t.id = r.type_id
2301 WHERE e1.flags = 0 AND e2.flags = 0
2302 ), json('[]'))
2303 )",
2304 params![max_rows],
2305 |row| row.get::<_, String>(0),
2306 )
2307 .map_err(sqlite_err)
2308 }
2309
2310 pub fn wipe(&self) -> Result<()> {
2311 let conn = self.writer.lock();
2312 conn.execute_batch(
2317 "DELETE FROM observation;
2318 DELETE FROM relation;
2319 DELETE FROM entity;
2320 DELETE FROM type_dict;
2321 INSERT INTO name_fts(name_fts) VALUES('delete-all');
2322 INSERT INTO obs_fts(obs_fts) VALUES('delete-all');
2323 UPDATE graph_stat SET value = 0 WHERE key IN ('entities', 'relations', 'observations');
2324 UPDATE graph_stat SET value = 0 WHERE key IN ('entity_seq', 'obs_seq');",
2325 )
2326 .map_err(sqlite_err)?;
2327 self.seq_entity.store(0, Ordering::Relaxed);
2328 self.seq_obs.store(0, Ordering::Relaxed);
2329 self.cache.lock().clear();
2330 Ok(())
2331 }
2332
2333 pub fn run_maintenance(&self) -> Result<()> {
2336 let conn = self.writer.lock();
2337
2338 conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);")
2339 .map_err(sqlite_err)?;
2340
2341 conn.execute_batch("PRAGMA optimize(0x10000);")
2342 .map_err(sqlite_err)?;
2343
2344 conn.execute_batch(
2345 "INSERT INTO name_fts(name_fts) VALUES('optimize');
2346 INSERT INTO obs_fts(obs_fts) VALUES('optimize');",
2347 )
2348 .map_err(sqlite_err)?;
2349
2350 Ok(())
2351 }
2352
2353 fn _traverse(
2354 &self,
2355 name: &str,
2356 direction: Direction,
2357 rtype: Option<&str>,
2358 depth: u32,
2359 _include_relations: bool,
2361 ) -> Result<String> {
2362 let conn = self.readers.get();
2363 let (start_id, _, _, _) = match self.get_entity_id(&conn, name)? {
2364 Some(v) => v,
2365 None => {
2366 return Err(MCSError::InvalidParams(format!(
2367 "Entity '{name}' not found"
2368 )))
2369 }
2370 };
2371
2372 let mut all_ids: HashSet<i64> = HashSet::new();
2373 let mut all_rels: HashSet<(i64, i64, i64)> = HashSet::new();
2374 let mut frontier: HashSet<i64> = HashSet::new();
2375 all_ids.insert(start_id);
2376 frontier.insert(start_id);
2377
2378 let type_filter: Option<i64> = rtype
2384 .filter(|rt| !rt.is_empty())
2385 .map(|rt| lookup_type_id(&conn, rt, 1).unwrap_or(-1));
2386
2387 let mut q_out_t = conn.prepare_cached(
2389 "SELECT to_id, type_id FROM relation WHERE from_id = ?1 AND type_id = ?2");
2390 let mut q_out = conn.prepare_cached(
2391 "SELECT to_id, type_id FROM relation WHERE from_id = ?1");
2392 let mut q_in_t = conn.prepare_cached(
2393 "SELECT from_id, type_id FROM relation WHERE to_id = ?1 AND type_id = ?2");
2394 let mut q_in = conn.prepare_cached(
2395 "SELECT from_id, type_id FROM relation WHERE to_id = ?1");
2396
2397 let mut cur_depth = 0u32;
2398 while cur_depth < depth && !frontier.is_empty() {
2399 let mut next_frontier: HashSet<i64> = HashSet::new();
2400
2401 for &fid in &frontier {
2402 if direction == Direction::Outgoing || direction == Direction::Both {
2403 if let Some(tid) = type_filter {
2404 if let Ok(ref mut stmt) = q_out_t {
2405 if let Ok(rows) = stmt.query_map(params![fid, tid], |row| {
2406 Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?))
2407 }) {
2408 for row in rows.flatten() {
2409 let (to_id, t_id) = row;
2410 all_rels.insert((fid, to_id, t_id));
2411 if all_ids.insert(to_id) { next_frontier.insert(to_id); }
2412 }
2413 }
2414 }
2415 } else if let Ok(ref mut stmt) = q_out {
2416 if let Ok(rows) = stmt.query_map(params![fid], |row| {
2417 Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?))
2418 }) {
2419 for row in rows.flatten() {
2420 let (to_id, t_id) = row;
2421 all_rels.insert((fid, to_id, t_id));
2422 if all_ids.insert(to_id) { next_frontier.insert(to_id); }
2423 }
2424 }
2425 }
2426 }
2427
2428 if direction == Direction::Incoming || direction == Direction::Both {
2429 if let Some(tid) = type_filter {
2430 if let Ok(ref mut stmt) = q_in_t {
2431 if let Ok(rows) = stmt.query_map(params![fid, tid], |row| {
2432 Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?))
2433 }) {
2434 for row in rows.flatten() {
2435 let (from_id, t_id) = row;
2436 all_rels.insert((from_id, fid, t_id));
2437 if all_ids.insert(from_id) { next_frontier.insert(from_id); }
2438 }
2439 }
2440 }
2441 } else if let Ok(ref mut stmt) = q_in {
2442 if let Ok(rows) = stmt.query_map(params![fid], |row| {
2443 Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?))
2444 }) {
2445 for row in rows.flatten() {
2446 let (from_id, t_id) = row;
2447 all_rels.insert((from_id, fid, t_id));
2448 if all_ids.insert(from_id) { next_frontier.insert(from_id); }
2449 }
2450 }
2451 }
2452 }
2453 }
2454
2455 frontier = next_frontier;
2456 cur_depth += 1;
2457 }
2458
2459 let entities_json: String = {
2460 if all_ids.is_empty() {
2461 "[]".to_string()
2462 } else {
2463 let ids: Vec<i64> = all_ids.iter().copied().collect();
2464 let placeholders: Vec<String> = ids.iter().map(|_| "?".to_string()).collect();
2465 let sql = format!(
2466 "SELECT COALESCE(json_group_array(json_object(
2467 'name', e.name,
2468 'entityType', t.name,
2469 'observations', COALESCE((
2470 SELECT json_group_array(o.body ORDER BY o.idx)
2471 FROM observation o WHERE o.entity_id = e.id
2472 ), json('[]'))
2473 ) ORDER BY e.id), json('[]'))
2474 FROM entity e
2475 JOIN type_dict t ON t.id = e.type_id
2476 WHERE e.id IN ({}) AND e.flags = 0",
2477 placeholders.join(",")
2478 );
2479 conn.query_row(&sql, rusqlite::params_from_iter(&ids), |row| {
2480 row.get::<_, String>(0)
2481 })
2482 .unwrap_or_else(|_| "[]".to_string())
2483 }
2484 };
2485
2486 let relations_json: String = {
2487 if all_rels.is_empty() {
2488 "[]".to_string()
2489 } else {
2490 let vals: Vec<String> = all_rels.iter().map(|_| "(?, ?, ?)".to_string()).collect();
2491 let sql = format!(
2492 "WITH r(from_id, to_id, type_id) AS (VALUES {})
2493 SELECT COALESCE(json_group_array(json_object(
2494 'from', e1.name,
2495 'to', e2.name,
2496 'relationType', t.name
2497 )), json('[]'))
2498 FROM r
2499 JOIN entity e1 ON e1.id = r.from_id
2500 JOIN entity e2 ON e2.id = r.to_id
2501 JOIN type_dict t ON t.id = r.type_id
2502 WHERE e1.flags = 0 AND e2.flags = 0",
2503 vals.join(", ")
2504 );
2505 let params: Vec<&dyn ToSql> = all_rels.iter()
2506 .flat_map(|(f, t, tp)| {
2507 vec![f as &dyn ToSql, t as &dyn ToSql, tp as &dyn ToSql]
2508 })
2509 .collect();
2510 let mut stmt = conn.prepare(&sql).map_err(sqlite_err)?;
2511 stmt.query_row(params.as_slice(), |row| row.get::<_, String>(0))
2512 .unwrap_or_else(|_| "[]".to_string())
2513 }
2514 };
2515
2516 let mut out = String::with_capacity(32 + entities_json.len() + relations_json.len());
2517 out.push_str("{\"entities\":");
2518 out.push_str(&entities_json);
2519 out.push_str(",\"relations\":");
2520 out.push_str(&relations_json);
2521 out.push('}');
2522 Ok(out)
2523 }
2524}
2525
2526#[cfg(test)]
2529mod tests {
2530 use super::*;
2531 use serde_json::Value;
2532 use std::ops::Deref;
2533 use std::path::PathBuf;
2534
2535 struct TestKg(GraphHandle, PathBuf);
2536
2537 impl Deref for TestKg {
2538 type Target = GraphHandle;
2539 fn deref(&self) -> &GraphHandle {
2540 &self.0
2541 }
2542 }
2543
2544 impl Drop for TestKg {
2545 fn drop(&mut self) {
2546 cleanup_db(&self.1);
2547 }
2548 }
2549
2550 fn cleanup_db(path: &std::path::Path) {
2551 let _ = std::fs::remove_file(path);
2552 let _ = std::fs::remove_file(path.with_extension("db-wal"));
2553 let _ = std::fs::remove_file(path.with_extension("db-shm"));
2554 }
2555
2556 fn new_kg() -> TestKg {
2557 use std::sync::atomic::AtomicU64;
2558 use std::sync::atomic::Ordering;
2559 static COUNTER: AtomicU64 = AtomicU64::new(0);
2560 let n = COUNTER.fetch_add(1, Ordering::SeqCst);
2561 let dir = std::env::temp_dir();
2562 let path = dir.join(format!("kg_test_{}_{}.db", std::process::id(), n));
2563 cleanup_db(&path);
2564 let kg = GraphHandle::new(&path, Durability::Async, 268435456, NonZeroUsize::new(10000).unwrap(), 4).expect("create KG");
2565 TestKg(kg, path)
2566 }
2567
2568 #[test]
2569 fn test_create_and_get_entity() {
2570 let kg = new_kg();
2571 let entities = vec![Entity {
2572 name: "test".into(),
2573 entity_type: "person".into(),
2574 observations: vec!["obs1".into(), "obs2".into()],
2575 }];
2576 let created = kg.create_entities(&entities).unwrap();
2577 assert_eq!(created.len(), 1);
2578
2579 let got = kg.get_entity("test").unwrap().unwrap();
2580 assert_eq!(got.name, "test");
2581 assert_eq!(got.entity_type, "person");
2582 assert_eq!(got.observations, vec!["obs1", "obs2"]);
2583 }
2584
2585 #[test]
2586 fn test_get_nonexistent() {
2587 let kg = new_kg();
2588 assert!(kg.get_entity("nonexistent").unwrap().is_none());
2589 }
2590
2591 #[test]
2592 fn test_delete_entity() {
2593 let kg = new_kg();
2594 kg.create_entities(&[Entity {
2595 name: "del".into(),
2596 entity_type: "t".into(),
2597 observations: vec![],
2598 }])
2599 .unwrap();
2600 assert!(kg.get_entity("del").unwrap().is_some());
2601 kg.delete_entities(&["del".to_string()]).unwrap();
2602 assert!(kg.get_entity("del").unwrap().is_none());
2603 }
2604
2605 #[test]
2606 fn test_add_and_delete_observations() {
2607 let kg = new_kg();
2608 kg.create_entities(&[Entity {
2609 name: "obs_test".into(),
2610 entity_type: "t".into(),
2611 observations: vec!["a".into()],
2612 }])
2613 .unwrap();
2614
2615 let added = kg.add_observations("obs_test", &["b".into(), "c".into()]).unwrap();
2616 assert_eq!(added.len(), 2);
2617
2618 let ent = kg.get_entity("obs_test").unwrap().unwrap();
2619 assert!(ent.observations.contains(&"b".into()));
2620 assert!(ent.observations.contains(&"c".into()));
2621
2622 kg.delete_observations("obs_test", &["b".into()]).unwrap();
2623 let ent = kg.get_entity("obs_test").unwrap().unwrap();
2624 assert!(!ent.observations.contains(&"b".into()));
2625 assert!(ent.observations.contains(&"c".into()));
2626 assert!(ent.observations.contains(&"a".into()));
2627 }
2628
2629 #[test]
2630 fn test_create_relations() {
2631 let kg = new_kg();
2632 kg.create_entities(&[
2633 Entity {
2634 name: "A".into(),
2635 entity_type: "node".into(),
2636 observations: vec![],
2637 },
2638 Entity {
2639 name: "B".into(),
2640 entity_type: "node".into(),
2641 observations: vec![],
2642 },
2643 ])
2644 .unwrap();
2645
2646 let rels = kg
2647 .create_relations(&[Relation {
2648 from: "A".into(),
2649 to: "B".into(),
2650 relation_type: "edge".into(),
2651 }])
2652 .unwrap();
2653 assert_eq!(rels.len(), 1);
2654
2655 assert_eq!(kg.get_entity_count().unwrap(), 2);
2656 assert_eq!(kg.get_relation_count().unwrap(), 1);
2657 }
2658
2659 #[test]
2660 fn test_search_nodes() {
2661 let kg = new_kg();
2662 kg.create_entities(&[Entity {
2663 name: "Einstein".into(),
2664 entity_type: "scientist".into(),
2665 observations: vec!["physics".into(), "relativity".into()],
2666 }])
2667 .unwrap();
2668
2669 let results = kg.search_nodes_filtered("physics", None, 0, 10);
2670 assert_eq!(results.len(), 1);
2671 assert_eq!(results[0].name, "Einstein");
2672
2673 let results = kg.search_nodes_filtered("physics", Some("scientist"), 0, 10);
2674 assert_eq!(results.len(), 1);
2675
2676 let results = kg.search_nodes_filtered("physics", Some("nonexistent"), 0, 10);
2677 assert_eq!(results.len(), 0);
2678 }
2679
2680 #[test]
2681 fn test_find_path() {
2682 let kg = new_kg();
2683 kg.create_entities(&[
2684 Entity { name: "A".into(), entity_type: "n".into(), observations: vec![] },
2685 Entity { name: "B".into(), entity_type: "n".into(), observations: vec![] },
2686 Entity { name: "C".into(), entity_type: "n".into(), observations: vec![] },
2687 ]).unwrap();
2688
2689 kg.create_relations(&[
2690 Relation { from: "A".into(), to: "B".into(), relation_type: "e".into() },
2691 Relation { from: "B".into(), to: "C".into(), relation_type: "e".into() },
2692 ]).unwrap();
2693
2694 let path = kg.find_path("A", "C").unwrap().unwrap();
2695 assert_eq!(path, vec!["A", "B", "C"]);
2696 }
2697
2698 #[test]
2699 fn test_degree() {
2700 let kg = new_kg();
2701 kg.create_entities(&[
2702 Entity { name: "A".into(), entity_type: "n".into(), observations: vec![] },
2703 Entity { name: "B".into(), entity_type: "n".into(), observations: vec![] },
2704 Entity { name: "C".into(), entity_type: "n".into(), observations: vec![] },
2705 ]).unwrap();
2706
2707 kg.create_relations(&[
2708 Relation { from: "A".into(), to: "B".into(), relation_type: "e".into() },
2709 Relation { from: "A".into(), to: "C".into(), relation_type: "e".into() },
2710 ]).unwrap();
2711
2712 assert_eq!(kg.degree("A", Direction::Outgoing).unwrap(), 2);
2713 assert_eq!(kg.degree("A", Direction::Incoming).unwrap(), 0);
2714 assert_eq!(kg.degree("B", Direction::Incoming).unwrap(), 1);
2715 }
2716
2717 #[test]
2718 fn test_neighbors() {
2719 let kg = new_kg();
2720 kg.create_entities(&[
2721 Entity { name: "A".into(), entity_type: "n".into(), observations: vec![] },
2722 Entity { name: "B".into(), entity_type: "n".into(), observations: vec![] },
2723 ]).unwrap();
2724
2725 kg.create_relations(&[Relation {
2726 from: "A".into(), to: "B".into(), relation_type: "e".into(),
2727 }]).unwrap();
2728
2729 let result = kg.neighbors("A", Direction::Outgoing, None, 1).unwrap();
2730 let v: Value = serde_json::from_str(&result).unwrap();
2731 assert_eq!(v["entities"].as_array().unwrap().len(), 2);
2732 assert_eq!(v["relations"].as_array().unwrap().len(), 1);
2733 }
2734
2735 #[test]
2736 fn test_open_nodes() {
2737 let kg = new_kg();
2738 kg.create_entities(&[
2739 Entity { name: "X".into(), entity_type: "n".into(), observations: vec!["obs_x".into()] },
2740 Entity { name: "Y".into(), entity_type: "n".into(), observations: vec!["obs_y".into()] },
2741 ]).unwrap();
2742
2743 kg.create_relations(&[Relation {
2744 from: "X".into(), to: "Y".into(), relation_type: "e".into(),
2745 }]).unwrap();
2746
2747 let result = kg.open_nodes(&["X".into()]);
2748 let v: Value = serde_json::from_str(&result).unwrap();
2749 assert_eq!(v["entities"].as_array().unwrap().len(), 1);
2750 assert_eq!(v["relations"].as_array().unwrap().len(), 1);
2751 }
2752
2753 #[test]
2754 fn test_entities_exist() {
2755 let kg = new_kg();
2756 kg.create_entities(&[Entity {
2757 name: "exists".into(), entity_type: "t".into(), observations: vec![],
2758 }]).unwrap();
2759
2760 let res = kg.entities_exist(&["exists".into(), "missing".into()]).unwrap();
2761 assert_eq!(res, vec![true, false]);
2762 }
2763
2764 #[test]
2765 fn test_describe_entity() {
2766 let kg = new_kg();
2767 kg.create_entities(&[Entity {
2768 name: "desc".into(), entity_type: "t".into(), observations: vec!["o".into()],
2769 }]).unwrap();
2770
2771 let entity = kg.describe_entity("desc").unwrap();
2772 assert_eq!(entity.name, "desc");
2773 }
2774
2775 #[test]
2776 fn test_entity_type_counts() {
2777 let kg = new_kg();
2778 kg.create_entities(&[
2779 Entity { name: "a".into(), entity_type: "person".into(), observations: vec![] },
2780 Entity { name: "b".into(), entity_type: "person".into(), observations: vec![] },
2781 Entity { name: "c".into(), entity_type: "place".into(), observations: vec![] },
2782 ]).unwrap();
2783
2784 let counts = kg.entity_type_counts();
2785 let map: FxHashMap<_, _> = counts.into_iter().collect();
2786 assert_eq!(map.get("person"), Some(&2));
2787 assert_eq!(map.get("place"), Some(&1));
2788 }
2789
2790 #[test]
2791 fn test_relation_type_counts() {
2792 let kg = new_kg();
2793 kg.create_entities(&[
2794 Entity { name: "a".into(), entity_type: "n".into(), observations: vec![] },
2795 Entity { name: "b".into(), entity_type: "n".into(), observations: vec![] },
2796 Entity { name: "c".into(), entity_type: "n".into(), observations: vec![] },
2797 ]).unwrap();
2798
2799 kg.create_relations(&[
2800 Relation { from: "a".into(), to: "b".into(), relation_type: "knows".into() },
2801 Relation { from: "a".into(), to: "c".into(), relation_type: "knows".into() },
2802 ]).unwrap();
2803
2804 let counts = kg.relation_type_counts();
2805 let map: FxHashMap<_, _> = counts.into_iter().collect();
2806 assert_eq!(map.get("knows"), Some(&2));
2807 }
2808
2809 #[test]
2810 fn test_upsert_entities() {
2811 let kg = new_kg();
2812 kg.create_entities(&[Entity {
2813 name: "u".into(), entity_type: "old".into(), observations: vec!["existing".into()],
2814 }]).unwrap();
2815
2816 kg.upsert_entities(&[Entity {
2818 name: "u".into(), entity_type: "new".into(), observations: vec!["existing".into(), "added".into()],
2819 }]).unwrap();
2820
2821 let ent = kg.get_entity("u").unwrap().unwrap();
2822 assert_eq!(ent.entity_type, "new");
2823 assert!(ent.observations.contains(&"added".into()));
2824 assert!(ent.observations.contains(&"existing".into()));
2825 }
2826
2827 #[test]
2828 fn test_merge_entities() {
2829 let kg = new_kg();
2830 kg.create_entities(&[
2831 Entity { name: "source".into(), entity_type: "t".into(), observations: vec!["src_obs".into()] },
2832 Entity { name: "target".into(), entity_type: "t".into(), observations: vec!["tgt_obs".into()] },
2833 ]).unwrap();
2834
2835 kg.create_relations(&[Relation {
2836 from: "source".into(), to: "target".into(), relation_type: "e".into(),
2837 }]).unwrap();
2838
2839 let merged = kg.merge_entities("source", "target").unwrap();
2840 assert_eq!(merged.name, "target");
2841 assert_eq!(kg.get_entity("source").unwrap().is_none(), true);
2842 }
2843
2844 #[test]
2845 fn test_find_all_paths() {
2846 let kg = new_kg();
2847 kg.create_entities(&[
2848 Entity { name: "A".into(), entity_type: "n".into(), observations: vec![] },
2849 Entity { name: "B".into(), entity_type: "n".into(), observations: vec![] },
2850 Entity { name: "C".into(), entity_type: "n".into(), observations: vec![] },
2851 ]).unwrap();
2852
2853 kg.create_relations(&[
2854 Relation { from: "A".into(), to: "B".into(), relation_type: "e".into() },
2855 Relation { from: "B".into(), to: "C".into(), relation_type: "e".into() },
2856 Relation { from: "A".into(), to: "C".into(), relation_type: "e".into() },
2857 ]).unwrap();
2858
2859 let paths = kg.find_all_paths("A", "C", 5, 10).unwrap();
2860 assert!(paths.len() >= 2);
2861 }
2862
2863 #[test]
2864 fn test_batch_get_entities() {
2865 let kg = new_kg();
2866 kg.create_entities(&[
2867 Entity { name: "a".into(), entity_type: "t".into(), observations: vec![] },
2868 Entity { name: "b".into(), entity_type: "t".into(), observations: vec![] },
2869 ]).unwrap();
2870
2871 let results = kg.batch_get_entities(&["a".into(), "missing".into(), "b".into()]);
2872 assert_eq!(results.len(), 3);
2873 assert!(results[0].is_some());
2874 assert!(results[1].is_none());
2875 assert!(results[2].is_some());
2876 }
2877
2878 #[test]
2879 fn test_export_graph() {
2880 let kg = new_kg();
2881 kg.create_entities(&[Entity {
2882 name: "exp".into(), entity_type: "t".into(), observations: vec!["o".into()],
2883 }]).unwrap();
2884
2885 let exported = kg.export("json", i64::MAX).unwrap();
2886 assert!(exported.contains("exp"));
2887 assert!(exported.contains("o"));
2888 }
2889
2890 #[test]
2891 fn test_graph_stats() {
2892 let kg = new_kg();
2893 assert_eq!(kg.get_entity_count().unwrap(), 0);
2894 assert_eq!(kg.get_relation_count().unwrap(), 0);
2895
2896 kg.create_entities(&[Entity {
2897 name: "s".into(), entity_type: "t".into(), observations: vec![],
2898 }]).unwrap();
2899
2900 assert_eq!(kg.get_entity_count().unwrap(), 1);
2901 }
2902
2903 #[test]
2904 fn test_read_graph_filtered() {
2905 let kg = new_kg();
2906 kg.create_entities(&[
2907 Entity { name: "p1".into(), entity_type: "person".into(), observations: vec![] },
2908 Entity { name: "p2".into(), entity_type: "place".into(), observations: vec![] },
2909 ]).unwrap();
2910
2911 let out = kg.read_graph_filtered(Some("person"), 0, 10).unwrap();
2912 let v: Value = serde_json::from_str(&out).unwrap();
2913 assert_eq!(v["entities"].as_array().unwrap().len(), 1);
2914 assert_eq!(v["entities"][0]["name"], "p1");
2915 }
2916
2917 #[test]
2918 fn test_wipe() {
2919 let kg = new_kg();
2920 kg.create_entities(&[Entity {
2921 name: "w".into(), entity_type: "t".into(), observations: vec!["o".into()],
2922 }]).unwrap();
2923 assert_eq!(kg.get_entity_count().unwrap(), 1);
2924
2925 kg.wipe().unwrap();
2926 assert_eq!(kg.get_entity_count().unwrap(), 0);
2927 }
2928
2929 #[test]
2930 fn test_push_json_str() {
2931 let mut buf = String::new();
2932 push_json_str(&mut buf, "hello");
2933 assert_eq!(buf, "\"hello\"");
2934 let mut buf = String::new();
2935 push_json_str(&mut buf, "he\"llo");
2936 assert_eq!(buf, "\"he\\\"llo\"");
2937 }
2938
2939 #[test]
2942 fn test_create_entities_empty_input() {
2943 let kg = new_kg();
2944 let created = kg.create_entities(&[]).unwrap();
2945 assert!(created.is_empty());
2946 }
2947
2948 #[test]
2949 fn test_create_entities_skip_empty_name() {
2950 let kg = new_kg();
2951 let created = kg.create_entities(&[Entity {
2952 name: "".into(),
2953 entity_type: "t".into(),
2954 observations: vec![],
2955 }])
2956 .unwrap();
2957 assert!(created.is_empty());
2958 assert_eq!(kg.get_entity_count().unwrap(), 0);
2959 }
2960
2961 #[test]
2962 fn test_create_entities_duplicate_names() {
2963 let kg = new_kg();
2964 let e = Entity {
2965 name: "dup".into(),
2966 entity_type: "t".into(),
2967 observations: vec!["obs".into()],
2968 };
2969 let first = kg.create_entities(&[e.clone()]).unwrap();
2970 assert_eq!(first.len(), 1);
2971 let second = kg.create_entities(&[e.clone()]).unwrap();
2972 assert!(second.is_empty());
2973 assert_eq!(kg.get_entity_count().unwrap(), 1);
2974 }
2975
2976 #[test]
2977 fn test_create_entities_partial_duplicates() {
2978 let kg = new_kg();
2979 let created = kg.create_entities(&[
2980 Entity { name: "a".into(), entity_type: "t".into(), observations: vec![] },
2981 Entity { name: "b".into(), entity_type: "t".into(), observations: vec![] },
2982 ]).unwrap();
2983 assert_eq!(created.len(), 2);
2984
2985 let second = kg.create_entities(&[
2986 Entity { name: "b".into(), entity_type: "t".into(), observations: vec![] },
2987 Entity { name: "c".into(), entity_type: "t".into(), observations: vec![] },
2988 ]).unwrap();
2989 assert_eq!(second.len(), 1); assert_eq!(second[0].name, "c");
2991 assert_eq!(kg.get_entity_count().unwrap(), 3);
2992 }
2993
2994 #[test]
2995 fn test_create_entities_mixed_empty_and_valid() {
2996 let kg = new_kg();
2997 let created = kg.create_entities(&[
2998 Entity { name: "".into(), entity_type: "t".into(), observations: vec![] },
2999 Entity { name: "valid".into(), entity_type: "t".into(), observations: vec![] },
3000 Entity { name: "".into(), entity_type: "t".into(), observations: vec![] },
3001 ]).unwrap();
3002 assert_eq!(created.len(), 1);
3003 assert_eq!(created[0].name, "valid");
3004 assert_eq!(kg.get_entity_count().unwrap(), 1);
3005 }
3006
3007 #[test]
3008 fn test_create_entities_same_name_in_batch() {
3009 let kg = new_kg();
3010 let created = kg.create_entities(&[
3011 Entity { name: "dup_in_batch".into(), entity_type: "t".into(), observations: vec![] },
3012 Entity { name: "dup_in_batch".into(), entity_type: "t".into(), observations: vec![] },
3013 ]).unwrap();
3014 assert_eq!(created.len(), 1);
3015 assert_eq!(kg.get_entity_count().unwrap(), 1);
3016 }
3017
3018 #[test]
3021 fn test_create_relations_empty_input() {
3022 let kg = new_kg();
3023 let rels = kg.create_relations(&[]).unwrap();
3024 assert!(rels.is_empty());
3025 }
3026
3027 #[test]
3028 fn test_create_relations_nonexistent_from() {
3029 let kg = new_kg();
3030 kg.create_entities(&[Entity {
3031 name: "B".into(), entity_type: "t".into(), observations: vec![],
3032 }]).unwrap();
3033
3034 let rels = kg.create_relations(&[Relation {
3035 from: "A".into(), to: "B".into(), relation_type: "e".into(),
3036 }]).unwrap();
3037 assert!(rels.is_empty());
3038 assert_eq!(kg.get_relation_count().unwrap(), 0);
3039 }
3040
3041 #[test]
3042 fn test_create_relations_nonexistent_to() {
3043 let kg = new_kg();
3044 kg.create_entities(&[Entity {
3045 name: "A".into(), entity_type: "t".into(), observations: vec![],
3046 }]).unwrap();
3047
3048 let rels = kg.create_relations(&[Relation {
3049 from: "A".into(), to: "B".into(), relation_type: "e".into(),
3050 }]).unwrap();
3051 assert!(rels.is_empty());
3052 assert_eq!(kg.get_relation_count().unwrap(), 0);
3053 }
3054
3055 #[test]
3056 fn test_create_relations_both_nonexistent() {
3057 let kg = new_kg();
3058 let rels = kg.create_relations(&[Relation {
3059 from: "A".into(), to: "B".into(), relation_type: "e".into(),
3060 }]).unwrap();
3061 assert!(rels.is_empty());
3062 }
3063
3064 #[test]
3065 fn test_create_relations_self_loop() {
3066 let kg = new_kg();
3067 kg.create_entities(&[Entity {
3068 name: "self".into(), entity_type: "t".into(), observations: vec![],
3069 }]).unwrap();
3070
3071 let rels = kg.create_relations(&[Relation {
3072 from: "self".into(), to: "self".into(), relation_type: "loop".into(),
3073 }]).unwrap();
3074 assert_eq!(rels.len(), 1);
3075 assert_eq!(kg.get_relation_count().unwrap(), 1);
3076 assert_eq!(kg.degree("self", Direction::Outgoing).unwrap(), 1);
3077 assert_eq!(kg.degree("self", Direction::Incoming).unwrap(), 1);
3078 }
3079
3080 #[test]
3081 fn test_create_relations_duplicate() {
3082 let kg = new_kg();
3083 kg.create_entities(&[
3084 Entity { name: "A".into(), entity_type: "t".into(), observations: vec![] },
3085 Entity { name: "B".into(), entity_type: "t".into(), observations: vec![] },
3086 ]).unwrap();
3087
3088 let r = Relation {
3089 from: "A".into(), to: "B".into(), relation_type: "e".into(),
3090 };
3091 let first = kg.create_relations(&[r.clone()]).unwrap();
3092 assert_eq!(first.len(), 1);
3093
3094 let second = kg.create_relations(&[r.clone()]).unwrap();
3095 assert!(second.is_empty());
3096 assert_eq!(kg.get_relation_count().unwrap(), 1);
3097 }
3098
3099 #[test]
3100 fn test_create_relations_new_type_auto_created() {
3101 let kg = new_kg();
3102 kg.create_entities(&[
3103 Entity { name: "A".into(), entity_type: "t".into(), observations: vec![] },
3104 Entity { name: "B".into(), entity_type: "t".into(), observations: vec![] },
3105 ]).unwrap();
3106
3107 let rels = kg.create_relations(&[Relation {
3108 from: "A".into(), to: "B".into(), relation_type: "brand_new_type".into(),
3109 }]).unwrap();
3110 assert_eq!(rels.len(), 1);
3111
3112 let counts = kg.relation_type_counts();
3113 let map: FxHashMap<_, _> = counts.into_iter().collect();
3114 assert_eq!(map.get("brand_new_type"), Some(&1));
3115 }
3116
3117 #[test]
3118 fn test_create_relations_degree_updates() {
3119 let kg = new_kg();
3120 kg.create_entities(&[
3121 Entity { name: "A".into(), entity_type: "t".into(), observations: vec![] },
3122 Entity { name: "B".into(), entity_type: "t".into(), observations: vec![] },
3123 Entity { name: "C".into(), entity_type: "t".into(), observations: vec![] },
3124 ]).unwrap();
3125
3126 kg.create_relations(&[
3127 Relation { from: "A".into(), to: "B".into(), relation_type: "e".into() },
3128 Relation { from: "A".into(), to: "C".into(), relation_type: "e".into() },
3129 ]).unwrap();
3130
3131 assert_eq!(kg.degree("A", Direction::Outgoing).unwrap(), 2);
3132 assert_eq!(kg.degree("A", Direction::Incoming).unwrap(), 0);
3133 assert_eq!(kg.degree("B", Direction::Incoming).unwrap(), 1);
3134 assert_eq!(kg.degree("C", Direction::Incoming).unwrap(), 1);
3135 assert_eq!(kg.degree("A", Direction::Both).unwrap(), 2);
3136 }
3137
3138 #[test]
3139 fn test_create_relations_delete_and_recreate() {
3140 let kg = new_kg();
3141 kg.create_entities(&[
3142 Entity { name: "A".into(), entity_type: "t".into(), observations: vec![] },
3143 Entity { name: "B".into(), entity_type: "t".into(), observations: vec![] },
3144 ]).unwrap();
3145
3146 let r = Relation {
3147 from: "A".into(), to: "B".into(), relation_type: "e".into(),
3148 };
3149 kg.create_relations(&[r.clone()]).unwrap();
3150 assert_eq!(kg.get_relation_count().unwrap(), 1);
3151
3152 kg.delete_relations(&[r.clone()]).unwrap();
3153 assert_eq!(kg.get_relation_count().unwrap(), 0);
3154
3155 let re = kg.create_relations(&[r.clone()]).unwrap();
3157 assert_eq!(re.len(), 1);
3158 assert_eq!(kg.get_relation_count().unwrap(), 1);
3159 }
3160
3161 #[test]
3164 fn test_create_entities_then_relations_then_delete_entity_with_relations() {
3165 let kg = new_kg();
3166 kg.create_entities(&[
3167 Entity { name: "A".into(), entity_type: "t".into(), observations: vec![] },
3168 Entity { name: "B".into(), entity_type: "t".into(), observations: vec![] },
3169 ]).unwrap();
3170 kg.create_relations(&[
3171 Relation { from: "A".into(), to: "B".into(), relation_type: "e".into() },
3172 ]).unwrap();
3173
3174 assert_eq!(kg.get_relation_count().unwrap(), 1);
3175
3176 kg.delete_entities(&["A".into()]).unwrap();
3178 assert_eq!(kg.get_entity("A").unwrap().is_none(), true);
3179 assert_eq!(kg.get_relation_count().unwrap(), 0);
3180 }
3181
3182 #[test]
3183 fn test_graph_stats_after_entity_with_observations() {
3184 let kg = new_kg();
3185 kg.create_entities(&[Entity {
3186 name: "stat".into(), entity_type: "t".into(),
3187 observations: vec!["o1".into(), "o2".into(), "o3".into()],
3188 }]).unwrap();
3189
3190 let ecount = kg.get_entity_count().unwrap();
3191 assert_eq!(ecount, 1);
3193
3194 kg.delete_entities(&["stat".into()]).unwrap();
3196 assert_eq!(kg.get_entity_count().unwrap(), 0);
3197 }
3198
3199 fn new_kg_with_pool(read_pool_size: usize) -> TestKg {
3202 use std::sync::atomic::AtomicU64;
3203 static COUNTER: AtomicU64 = AtomicU64::new(1_000_000);
3204 let n = COUNTER.fetch_add(1, Ordering::SeqCst);
3205 let path = std::env::temp_dir().join(format!("kg_pool_{}_{}.db", std::process::id(), n));
3206 cleanup_db(&path);
3207 let kg = GraphHandle::new(
3208 &path,
3209 Durability::Async,
3210 268_435_456,
3211 NonZeroUsize::new(10_000).unwrap(),
3212 read_pool_size,
3213 )
3214 .expect("create KG");
3215 TestKg(kg, path)
3216 }
3217
3218 fn seed_line(kg: &GraphHandle, n: usize) {
3219 let entities: Vec<Entity> = (0..n)
3220 .map(|i| Entity {
3221 name: format!("n{i}"),
3222 entity_type: "node".into(),
3223 observations: vec![format!("obs of n{i}")],
3224 })
3225 .collect();
3226 kg.create_entities(&entities).unwrap();
3227 let rels: Vec<Relation> = (0..n.saturating_sub(1))
3228 .map(|i| Relation {
3229 from: format!("n{i}"),
3230 to: format!("n{}", i + 1),
3231 relation_type: "edge".into(),
3232 })
3233 .collect();
3234 if !rels.is_empty() {
3235 kg.create_relations(&rels).unwrap();
3236 }
3237 }
3238
3239 fn count_relations(graph_json: &str) -> usize {
3240 let v: Value = serde_json::from_str(graph_json).unwrap();
3241 v["relations"].as_array().unwrap().len()
3242 }
3243
3244 fn count_entities(graph_json: &str) -> usize {
3245 let v: Value = serde_json::from_str(graph_json).unwrap();
3246 v["entities"].as_array().unwrap().len()
3247 }
3248
3249 #[test]
3252 fn test_pool_size_one_still_works() {
3253 let kg = new_kg_with_pool(1);
3254 seed_line(&kg, 5);
3255 assert_eq!(kg.get_entity_count().unwrap(), 5);
3256 assert!(kg.get_entity("n2").unwrap().is_some());
3257 let g = kg.read_graph_filtered(None, 0, usize::MAX).unwrap();
3258 assert_eq!(count_entities(&g), 5);
3259 }
3260
3261 #[test]
3262 fn test_reads_see_committed_writes() {
3263 let kg = new_kg_with_pool(4);
3266 kg.create_entities(&[Entity {
3267 name: "fresh".into(),
3268 entity_type: "t".into(),
3269 observations: vec!["v".into()],
3270 }])
3271 .unwrap();
3272 let got = kg.get_entity("fresh").unwrap().unwrap();
3274 assert_eq!(got.observations, vec!["v"]);
3275 }
3276
3277 #[test]
3278 fn test_concurrent_readers_consistent() {
3279 let kg = new_kg_with_pool(4);
3282 seed_line(&kg, 50);
3283
3284 std::thread::scope(|s| {
3285 for _ in 0..8 {
3287 s.spawn(|| {
3288 for _ in 0..200 {
3289 let _ = kg.get_entity("n10");
3290 let _ = kg.search_nodes_filtered("obs", None, 0, 10);
3291 let _ = kg.read_graph_filtered(None, 0, 100);
3292 let _ = kg.get_entity_count();
3293 let _ = kg.neighbors("n10", Direction::Both, None, 2);
3294 }
3295 });
3296 }
3297 s.spawn(|| {
3299 for i in 100..160 {
3300 kg.create_entities(&[Entity {
3301 name: format!("w{i}"),
3302 entity_type: "node".into(),
3303 observations: vec![format!("w obs {i}")],
3304 }])
3305 .unwrap();
3306 }
3307 });
3308 });
3309
3310 assert_eq!(kg.get_entity_count().unwrap(), 110);
3312 assert!(kg.get_entity("w159").unwrap().is_some());
3313 }
3314
3315 #[test]
3316 fn test_reader_pool_rejects_writes_internally() {
3317 let kg = new_kg_with_pool(1);
3323 seed_line(&kg, 3);
3324 std::thread::scope(|s| {
3325 for _ in 0..4 {
3326 s.spawn(|| {
3327 for _ in 0..100 {
3328 let _ = kg.read_graph_filtered(None, 0, 10);
3329 }
3330 });
3331 }
3332 });
3333 assert_eq!(kg.get_entity_count().unwrap(), 3);
3334 }
3335
3336 #[test]
3339 fn test_read_graph_relations_scoped_to_page() {
3340 let kg = new_kg_with_pool(2);
3341 seed_line(&kg, 4);
3343
3344 let full = kg.read_graph_filtered(None, 0, usize::MAX).unwrap();
3346 assert_eq!(count_entities(&full), 4);
3347 assert_eq!(count_relations(&full), 3);
3348
3349 let page1 = kg.read_graph_filtered(None, 0, 1).unwrap();
3352 assert_eq!(count_entities(&page1), 1);
3353 assert_eq!(count_relations(&page1), 0);
3354
3355 let page2 = kg.read_graph_filtered(None, 0, 2).unwrap();
3358 assert_eq!(count_entities(&page2), 2);
3359 assert_eq!(count_relations(&page2), 1);
3360 }
3361
3362 #[test]
3363 fn test_read_graph_pagination_offset() {
3364 let kg = new_kg_with_pool(2);
3365 seed_line(&kg, 5);
3366 let g = kg.read_graph_filtered(None, 2, 2).unwrap();
3367 assert_eq!(count_entities(&g), 2);
3368 assert!(!g.contains("\"n0\""));
3370 assert!(!g.contains("\"n1\""));
3371 assert!(g.contains("\"n2\""));
3372 assert!(g.contains("\"n3\""));
3373 }
3374
3375 #[test]
3376 fn test_read_graph_empty() {
3377 let kg = new_kg_with_pool(2);
3378 let g = kg.read_graph_filtered(None, 0, usize::MAX).unwrap();
3379 assert_eq!(g, r#"{"entities":[],"relations":[]}"#);
3380 }
3381
3382 #[test]
3383 fn test_read_graph_filtered_by_type() {
3384 let kg = new_kg_with_pool(2);
3385 kg.create_entities(&[
3386 Entity { name: "p1".into(), entity_type: "person".into(), observations: vec![] },
3387 Entity { name: "q1".into(), entity_type: "place".into(), observations: vec![] },
3388 Entity { name: "p2".into(), entity_type: "person".into(), observations: vec![] },
3389 ])
3390 .unwrap();
3391 let g = kg.read_graph_filtered(Some("person"), 0, usize::MAX).unwrap();
3392 assert_eq!(count_entities(&g), 2);
3393 assert!(g.contains("\"p1\""));
3394 assert!(g.contains("\"p2\""));
3395 assert!(!g.contains("\"q1\""));
3396 }
3397
3398 #[test]
3399 fn test_export_respects_max_rows() {
3400 let kg = new_kg_with_pool(2);
3401 seed_line(&kg, 5);
3402
3403 let full = kg.export("json", i64::MAX).unwrap();
3405 assert_eq!(count_entities(&full), 5);
3406 assert_eq!(count_relations(&full), 4);
3407
3408 let capped = kg.export("json", 2).unwrap();
3410 assert_eq!(count_entities(&capped), 2);
3411 assert_eq!(count_relations(&capped), 2);
3412 }
3413
3414 #[test]
3415 fn test_export_negative_max_rows_is_unbounded() {
3416 let kg = new_kg_with_pool(2);
3417 seed_line(&kg, 3);
3418 let out = kg.export("json", -1).unwrap();
3420 assert_eq!(count_entities(&out), 3);
3421 }
3422
3423 #[test]
3426 fn test_many_small_write_batches_stay_consistent() {
3427 let kg = new_kg_with_pool(2);
3428 for i in 0..100 {
3429 kg.create_entities(&[Entity {
3430 name: format!("e{i}"),
3431 entity_type: "t".into(),
3432 observations: vec![format!("o{i}")],
3433 }])
3434 .unwrap();
3435 }
3436 assert_eq!(kg.get_entity_count().unwrap(), 100);
3437 let hits = kg.search_nodes_filtered("e57", None, 0, 10);
3440 assert!(hits.iter().any(|e| e.name == "e57"));
3441 }
3442
3443 #[test]
3446 fn test_wipe_clears_name_and_obs_fts() {
3447 let kg = new_kg_with_pool(2);
3448 kg.create_entities(&[Entity {
3449 name: "Einstein".into(),
3450 entity_type: "scientist".into(),
3451 observations: vec!["physics".into()],
3452 }])
3453 .unwrap();
3454
3455 assert_eq!(kg.search_nodes_filtered("Einstein", None, 0, 10).len(), 1);
3457 assert_eq!(kg.search_nodes_filtered("physics", None, 0, 10).len(), 1);
3458
3459 kg.wipe().unwrap();
3460
3461 assert_eq!(kg.get_entity_count().unwrap(), 0);
3464 assert!(kg.search_nodes_filtered("Einstein", None, 0, 10).is_empty());
3465 assert!(kg.search_nodes_filtered("physics", None, 0, 10).is_empty());
3466 }
3467
3468 #[test]
3469 fn test_wipe_then_recreate_search_works() {
3470 let kg = new_kg_with_pool(2);
3473 kg.create_entities(&[Entity {
3474 name: "Einstein".into(),
3475 entity_type: "scientist".into(),
3476 observations: vec!["physics".into()],
3477 }])
3478 .unwrap();
3479 kg.wipe().unwrap();
3480
3481 kg.create_entities(&[Entity {
3482 name: "Einstein".into(),
3483 entity_type: "scientist".into(),
3484 observations: vec!["physics".into(), "relativity".into()],
3485 }])
3486 .unwrap();
3487
3488 let by_name = kg.search_nodes_filtered("Einstein", None, 0, 10);
3489 assert_eq!(by_name.len(), 1, "exactly one Einstein after recreate");
3490 let by_obs = kg.search_nodes_filtered("relativity", None, 0, 10);
3491 assert_eq!(by_obs.len(), 1);
3492 assert_eq!(kg.get_entity_count().unwrap(), 1);
3493 }
3494
3495 #[test]
3498 fn test_search_relations_missing_type_returns_empty() {
3499 let kg = new_kg_with_pool(2);
3500 seed_line(&kg, 3); let r = kg.search_relations(None, None, Some("does_not_exist"));
3504 assert!(r.is_empty());
3505 let types = kg.relation_type_counts();
3507 assert!(types.iter().all(|(t, _)| t != "does_not_exist"));
3508 }
3509
3510 #[test]
3511 fn test_search_relations_missing_from_returns_empty() {
3512 let kg = new_kg_with_pool(2);
3513 seed_line(&kg, 3);
3514 let r = kg.search_relations(Some("ghost"), None, None);
3515 assert!(r.is_empty(), "missing 'from' must not match every relation");
3516 }
3517
3518 #[test]
3519 fn test_search_relations_existing_filters_still_work() {
3520 let kg = new_kg_with_pool(2);
3521 seed_line(&kg, 3);
3522 let r = kg.search_relations(Some("n0"), None, Some("edge"));
3523 assert_eq!(r.len(), 1);
3524 assert_eq!(r[0].from, "n0");
3525 assert_eq!(r[0].to, "n1");
3526 }
3527
3528 #[test]
3529 fn test_neighbors_missing_type_returns_only_start() {
3530 let kg = new_kg_with_pool(2);
3531 seed_line(&kg, 3);
3532 let json = kg
3533 .neighbors("n0", Direction::Both, Some("nonexistent"), 2)
3534 .unwrap();
3535 assert_eq!(count_entities(&json), 1);
3537 assert_eq!(count_relations(&json), 0);
3538 }
3539
3540 #[test]
3541 fn test_neighbors_existing_type_filters() {
3542 let kg = new_kg_with_pool(2);
3543 kg.create_entities(&[
3544 Entity { name: "a".into(), entity_type: "n".into(), observations: vec![] },
3545 Entity { name: "b".into(), entity_type: "n".into(), observations: vec![] },
3546 Entity { name: "c".into(), entity_type: "n".into(), observations: vec![] },
3547 ])
3548 .unwrap();
3549 kg.create_relations(&[
3550 Relation { from: "a".into(), to: "b".into(), relation_type: "knows".into() },
3551 Relation { from: "a".into(), to: "c".into(), relation_type: "likes".into() },
3552 ])
3553 .unwrap();
3554 let json = kg.neighbors("a", Direction::Outgoing, Some("knows"), 1).unwrap();
3555 assert!(json.contains("\"b\""));
3556 assert!(!json.contains("\"c\""));
3557 assert_eq!(count_relations(&json), 1);
3558 }
3559}