Skip to main content

mcp_memory/
kg.rs

1use rustc_hash::FxHashMap;
2use std::collections::{HashSet, VecDeque};
3use std::num::NonZeroUsize;
4use std::path::Path;
5use std::sync::atomic::{AtomicI64, AtomicUsize, Ordering};
6use std::time::{Duration, SystemTime, UNIX_EPOCH};
7
8use lru::LruCache;
9use parking_lot::{Mutex, MutexGuard};
10use rusqlite::{params, types::ToSql, Connection, OpenFlags};
11
12use crate::config::Durability;
13use crate::errors::{MCSError, Result};
14use crate::types::{Entity, Relation};
15
16fn sqlite_err(e: rusqlite::Error) -> MCSError {
17    MCSError::IoError(std::io::Error::other(e.to_string()))
18}
19
20fn is_not_found(e: &rusqlite::Error) -> bool {
21    matches!(e, rusqlite::Error::QueryReturnedNoRows)
22}
23
24#[inline(always)]
25fn now_us() -> i64 {
26    SystemTime::now()
27        .duration_since(UNIX_EPOCH)
28        .unwrap_or_default()
29        .as_micros() as i64
30}
31
32#[inline(always)]
33pub(crate) fn name_hash(name: &str) -> i64 {
34    let mut h: u64 = 0xcbf29ce484222325;
35    for b in name.bytes() {
36        h ^= b as u64;
37        h = h.wrapping_mul(0x100000001b3);
38    }
39    h as i64
40}
41
42fn load_observations(conn: &Connection, entity_id: i64) -> Result<Vec<String>> {
43    let mut stmt = conn
44        .prepare_cached("SELECT body FROM observation WHERE entity_id = ?1 ORDER BY idx")
45        .map_err(sqlite_err)?;
46    let rows = stmt
47        .query_map(params![entity_id], |row| row.get::<_, String>(0))
48        .map_err(sqlite_err)?
49        .filter_map(|r| r.ok())
50        .collect::<Vec<_>>();
51    Ok(rows)
52}
53
54fn load_observations_opt(conn: &Connection, entity_id: i64) -> Vec<String> {
55    load_observations(conn, entity_id).unwrap_or_default()
56}
57
58fn entity_name_lookup(conn: &Connection, name: &str) -> Result<Option<i64>> {
59    let h = name_hash(name);
60    let mut stmt = conn
61        .prepare_cached(
62            "SELECT id FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
63        )
64        .map_err(sqlite_err)?;
65    match stmt.query_row(params![h, name], |row| row.get::<_, i64>(0)) {
66        Ok(id) => Ok(Some(id)),
67        Err(e) if is_not_found(&e) => Ok(None),
68        Err(e) => Err(sqlite_err(e)),
69    }
70}
71
72fn get_type_id(conn: &Connection, type_name: &str, kind: i64) -> Result<i64> {
73    let mut sel = conn
74        .prepare_cached("SELECT id FROM type_dict WHERE kind = ?1 AND name = ?2")
75        .map_err(sqlite_err)?;
76    if let Ok(id) = sel.query_row(params![kind, type_name], |row| row.get::<_, i64>(0)) {
77        return Ok(id);
78    }
79    conn.execute(
80        "INSERT INTO type_dict (kind, name, count) VALUES (?1, ?2, 0)",
81        params![kind, type_name],
82    )
83    .map_err(sqlite_err)?;
84    Ok(conn.last_insert_rowid())
85}
86
87/// Read-only type lookup. Unlike [`get_type_id`] this never inserts, so it is
88/// safe to call on a `query_only` reader connection. Returns `None` when the
89/// type does not exist.
90fn lookup_type_id(conn: &Connection, type_name: &str, kind: i64) -> Option<i64> {
91    conn.prepare_cached("SELECT id FROM type_dict WHERE kind = ?1 AND name = ?2")
92        .ok()?
93        .query_row(params![kind, type_name], |row| row.get::<_, i64>(0))
94        .ok()
95}
96
97fn inc_type_count(conn: &Connection, type_id: i64, delta: i64) -> Result<()> {
98    conn.execute(
99        "UPDATE type_dict SET count = count + ?1 WHERE id = ?2",
100        params![delta, type_id],
101    )
102    .map_err(sqlite_err)?;
103    Ok(())
104}
105
106fn inc_graph_stat(conn: &Connection, key: &str, delta: i64) -> Result<()> {
107    conn.execute(
108        "UPDATE graph_stat SET value = value + ?1 WHERE key = ?2",
109        params![delta, key],
110    )
111    .map_err(sqlite_err)?;
112    Ok(())
113}
114
115fn read_graph_stat(conn: &Connection, key: &str) -> Result<i64> {
116    conn.query_row(
117        "SELECT value FROM graph_stat WHERE key = ?1",
118        params![key],
119        |row| row.get(0),
120    )
121    .map_err(sqlite_err)
122}
123
124fn name_of_type(conn: &Connection, type_id: i64) -> Result<String> {
125    conn.query_row(
126        "SELECT name FROM type_dict WHERE id = ?1",
127        params![type_id],
128        |row| row.get(0),
129    )
130    .map_err(sqlite_err)
131}
132
133fn select_all_types(conn: &Connection, kind: i64) -> Result<Vec<(String, usize)>> {
134    let mut stmt = conn
135        .prepare_cached(
136            "SELECT name, count FROM type_dict WHERE kind = ?1 AND count > 0 ORDER BY count DESC",
137        )
138        .map_err(sqlite_err)?;
139    let rows = stmt
140        .query_map(params![kind], |row| {
141            Ok((
142                row.get::<_, String>(0)?,
143                row.get::<_, i64>(1)? as usize,
144            ))
145        })
146        .map_err(sqlite_err)?
147        .filter_map(|r| r.ok())
148        .collect();
149    Ok(rows)
150}
151
152fn entity_by_id(conn: &Connection, id: i64) -> Result<Entity> {
153    let mut stmt = conn
154        .prepare_cached(
155            "SELECT e.name, t.name,
156               COALESCE((SELECT json_group_array(o.body ORDER BY o.idx) FROM observation o WHERE o.entity_id = e.id), '[]')
157             FROM entity e JOIN type_dict t ON t.id = e.type_id WHERE e.id = ?1 AND e.flags = 0",
158        )
159        .map_err(sqlite_err)?;
160    let (name, etype, obs_json): (String, String, String) = stmt
161        .query_row(params![id], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)))
162        .map_err(sqlite_err)?;
163    let observations: Vec<String> = serde_json::from_str(&obs_json).unwrap_or_default();
164    Ok(Entity {
165        name,
166        entity_type: etype,
167        observations,
168    })
169}
170
171/// Direction of relation traversal.
172#[derive(Debug, Clone, Copy, PartialEq, Eq)]
173pub enum Direction {
174    Outgoing,
175    Incoming,
176    Both,
177}
178
179impl Direction {
180    pub fn parse(s: Option<&str>) -> Self {
181        match s {
182            Some("OUTGOING") => Direction::Outgoing,
183            Some("INCOMING") => Direction::Incoming,
184            _ => Direction::Both,
185        }
186    }
187}
188
189/// Escape a string for embedding in JSON, writing directly into the given buffer.
190/// Avoids allocating a temporary `serde_json::Value` for the JSON-RPC wrapper.
191pub fn push_json_str(buf: &mut String, raw: &str) {
192    buf.push('"');
193    let mut start = 0;
194    let bytes = raw.as_bytes();
195    for (i, &b) in bytes.iter().enumerate() {
196        let esc: u8 = match b {
197            b'"' => b'"',
198            b'\\' => b'\\',
199            b'\n' => b'n',
200            b'\r' => b'r',
201            b'\t' => b't',
202            0x08 => b'b',
203            0x0C => b'f',
204            0x00..=0x07 | 0x0B | 0x0E..=0x1F => continue, // escaped below
205            _ => continue,
206        };
207        buf.push_str(&raw[start..i]);
208        buf.push('\\');
209        buf.push(esc as char);
210        start = i + 1;
211    }
212    // Control chars 0x00-0x1F not handled above: escape as \u00XX
213    for (i, &b) in bytes.iter().enumerate().skip(start) {
214        if b <= 0x07 || b == 0x0B || (b >= 0x0E && b <= 0x1F) {
215            buf.push_str(&raw[start..i]);
216            write_escape_unicode(buf, b);
217            start = i + 1;
218        }
219    }
220    buf.push_str(&raw[start..]);
221    buf.push('"');
222}
223
224#[inline(never)]
225fn write_escape_unicode(buf: &mut String, b: u8) {
226    use std::fmt::Write;
227    write!(buf, "\\u{:04x}", b).unwrap();
228}
229
230// ── MetaCache ────────────────────────────────────────────────────────────
231
232#[derive(Copy, Clone)]
233struct EntityMeta {
234    id: i64,
235    type_id: i64,
236    obs_count: i64,
237    out_deg: i64,
238    in_deg: i64,
239}
240
241// ── Transaction guard (RAII rollback on error) ─────────────────────────
242
243struct TxGuard<'a> {
244    conn: &'a Connection,
245    done: bool,
246}
247
248impl<'a> TxGuard<'a> {
249    fn begin(conn: &'a Connection) -> Result<Self> {
250        // BEGIN IMMEDIATE acquires the WAL write lock up front rather than
251        // lazily on the first write. This makes the busy-timeout apply to lock
252        // acquisition deterministically and avoids `SQLITE_BUSY_SNAPSHOT`
253        // surprises when readers are concurrently active.
254        conn.execute_batch("BEGIN IMMEDIATE").map_err(sqlite_err)?;
255        Ok(Self { conn, done: false })
256    }
257
258    fn commit(mut self) -> Result<()> {
259        self.done = true;
260        self.conn.execute_batch("COMMIT").map_err(sqlite_err)
261    }
262}
263
264impl Drop for TxGuard<'_> {
265    fn drop(&mut self) {
266        if !self.done {
267            let _ = self.conn.execute_batch("ROLLBACK");
268        }
269    }
270}
271
272// ── Reader pool ───────────────────────────────────────────────────────────
273
274/// A small fixed pool of `query_only` SQLite connections used for read
275/// operations. WAL mode permits any number of concurrent readers alongside the
276/// single writer, so spreading reads across several connections lets them run
277/// in parallel instead of serializing on the writer's mutex.
278struct ReaderPool {
279    conns: Vec<Mutex<Connection>>,
280    next: AtomicUsize,
281}
282
283impl ReaderPool {
284    /// Acquire a reader connection. Fast path: grab the first idle one. If every
285    /// connection is busy, block on a round-robin pick so callers still make
286    /// progress (and never spin).
287    fn get(&self) -> MutexGuard<'_, Connection> {
288        for c in &self.conns {
289            if let Some(g) = c.try_lock() {
290                return g;
291            }
292        }
293        let i = self.next.fetch_add(1, Ordering::Relaxed) % self.conns.len();
294        self.conns[i].lock()
295    }
296}
297
298// ── GraphHandle ──────────────────────────────────────────────────────────
299
300pub struct GraphHandle {
301    /// The single read-write connection. SQLite allows only one writer, so all
302    /// mutations serialize here.
303    writer: Mutex<Connection>,
304    /// Pool of `query_only` connections for concurrent reads (WAL).
305    readers: ReaderPool,
306    seq_entity: AtomicI64,
307    seq_obs: AtomicI64,
308    cache: Mutex<LruCache<String, EntityMeta>>,
309}
310
311/// Open one `query_only` reader connection against an existing WAL database.
312///
313/// The connection is opened read-write at the OS level (so it can attach to the
314/// `-shm` wal-index — SQLite cannot read a WAL database through a pure
315/// `SQLITE_OPEN_READ_ONLY` handle) and then locked to reads with
316/// `PRAGMA query_only = ON`, which makes any accidental write error out.
317fn open_reader(path: &Path, mmap_size: i64) -> Result<Connection> {
318    let conn = Connection::open_with_flags(
319        path,
320        OpenFlags::SQLITE_OPEN_READ_WRITE
321            | OpenFlags::SQLITE_OPEN_NO_MUTEX
322            | OpenFlags::SQLITE_OPEN_URI,
323    )
324    .map_err(sqlite_err)?;
325    conn.busy_timeout(Duration::from_secs(10)).map_err(sqlite_err)?;
326    conn.execute_batch(
327        "PRAGMA query_only   = ON;
328         PRAGMA cache_size   = -50000;
329         PRAGMA temp_store   = MEMORY;",
330    )
331    .map_err(sqlite_err)?;
332    conn.execute_batch(&format!("PRAGMA mmap_size = {mmap_size};"))
333        .map_err(sqlite_err)?;
334    Ok(conn)
335}
336
337impl GraphHandle {
338    pub fn new(
339        path: &Path,
340        durability: Durability,
341        mmap_size: i64,
342        lru_cache_size: NonZeroUsize,
343        read_pool_size: usize,
344    ) -> Result<Self> {
345        {
346            let tmp = Connection::open(path).map_err(sqlite_err)?;
347            tmp.execute_batch("PRAGMA page_size = 16384;")
348                .map_err(sqlite_err)?;
349            drop(tmp);
350        }
351
352        let conn = Connection::open(path).map_err(sqlite_err)?;
353        // Apply the busy handler through the API so it is in force for every
354        // subsequent statement (including schema creation and BEGIN IMMEDIATE).
355        conn.busy_timeout(Duration::from_secs(10)).map_err(sqlite_err)?;
356
357        conn.execute_batch(
358             "PRAGMA journal_mode = WAL;
359             PRAGMA foreign_keys = OFF;
360             PRAGMA cache_size    = -50000;
361             PRAGMA temp_store    = MEMORY;
362             PRAGMA busy_timeout  = 5000;
363             PRAGMA synchronous   = NORMAL;
364             PRAGMA journal_size_limit = 67108864;
365
366             CREATE TABLE IF NOT EXISTS entity (
367                 id          INTEGER PRIMARY KEY,
368                 name_hash   INTEGER NOT NULL,
369                 name        TEXT    NOT NULL,
370                 type_id     INTEGER NOT NULL,
371                 obs_count   INTEGER NOT NULL DEFAULT 0,
372                 out_deg     INTEGER NOT NULL DEFAULT 0,
373                 in_deg      INTEGER NOT NULL DEFAULT 0,
374                 created_us  INTEGER NOT NULL,
375                 updated_us  INTEGER NOT NULL,
376                 flags       INTEGER NOT NULL DEFAULT 0
377             ) STRICT;
378
379             CREATE INDEX IF NOT EXISTS entity_by_hash
380                 ON entity(name_hash, type_id, obs_count, out_deg, in_deg)
381                 WHERE flags = 0;
382
383             CREATE INDEX IF NOT EXISTS entity_name_ci
384                 ON entity(lower(name))
385                 WHERE flags = 0;
386
387             CREATE TABLE IF NOT EXISTS observation (
388                 id          INTEGER PRIMARY KEY,
389                 entity_id   INTEGER NOT NULL,
390                 idx         INTEGER NOT NULL,
391                 body        TEXT    NOT NULL,
392                 created_us  INTEGER NOT NULL
393             ) STRICT;
394
395             CREATE INDEX IF NOT EXISTS obs_by_entity
396                 ON observation(entity_id, idx);
397
398             CREATE TABLE IF NOT EXISTS relation (
399                 from_id     INTEGER NOT NULL,
400                 to_id       INTEGER NOT NULL,
401                 type_id     INTEGER NOT NULL,
402                 created_us  INTEGER NOT NULL
403             ) STRICT;
404
405             CREATE INDEX IF NOT EXISTS rel_out
406                 ON relation(from_id, type_id, to_id);
407
408             CREATE INDEX IF NOT EXISTS rel_in
409                 ON relation(to_id, type_id, from_id);
410
411             CREATE VIRTUAL TABLE IF NOT EXISTS name_fts
412                 USING fts5(name, content='entity', content_rowid='id',
413                            tokenize='unicode61 remove_diacritics 2');
414
415             CREATE VIRTUAL TABLE IF NOT EXISTS obs_fts
416                 USING fts5(body, content='observation', content_rowid='id',
417                            tokenize='unicode61 remove_diacritics 2');
418
419             CREATE TRIGGER IF NOT EXISTS obs_fts_ai AFTER INSERT ON observation BEGIN
420               INSERT INTO obs_fts(rowid, body) VALUES (new.id, new.body);
421             END;
422
423             CREATE TRIGGER IF NOT EXISTS obs_fts_bd BEFORE DELETE ON observation BEGIN
424               INSERT INTO obs_fts(obs_fts, rowid, body) VALUES ('delete', old.id, '');
425             END;
426
427             CREATE TABLE IF NOT EXISTS type_dict (
428                 id     INTEGER PRIMARY KEY,
429                 kind   INTEGER NOT NULL,
430                 name   TEXT    NOT NULL,
431                 count  INTEGER NOT NULL DEFAULT 0
432             ) STRICT;
433
434             CREATE INDEX IF NOT EXISTS type_by_name
435                 ON type_dict(kind, name);
436
437             CREATE TABLE IF NOT EXISTS graph_stat (
438                 key    TEXT NOT NULL PRIMARY KEY,
439                 value  INTEGER NOT NULL
440             ) STRICT, WITHOUT ROWID;
441
442             CREATE TABLE IF NOT EXISTS hub_degree (
443                 entity_id INTEGER PRIMARY KEY,
444                 out_deg   INTEGER NOT NULL,
445                 in_deg    INTEGER NOT NULL
446             ) STRICT;
447
448             CREATE TABLE IF NOT EXISTS partition_map (
449                 table_name TEXT NOT NULL PRIMARY KEY,
450                 role       INTEGER NOT NULL,
451                 type_id    INTEGER,
452                 row_count  INTEGER NOT NULL DEFAULT 0
453             ) STRICT, WITHOUT ROWID;",
454        )
455        .map_err(sqlite_err)?;
456
457        conn.execute_batch(&format!("PRAGMA mmap_size = {mmap_size};"))
458            .map_err(sqlite_err)?;
459
460        let sync_pragma = match durability {
461            Durability::Sync => "PRAGMA synchronous = FULL",
462            Durability::Async => "PRAGMA synchronous = NORMAL",
463        };
464        conn.execute_batch(sync_pragma).map_err(sqlite_err)?;
465
466        let has_stat: bool = conn
467            .query_row("SELECT 1 FROM graph_stat LIMIT 1", [], |_| Ok(()))
468            .is_ok();
469        if !has_stat {
470            conn.execute_batch(
471                "INSERT INTO graph_stat(key, value) VALUES
472                 ('entities', 0), ('relations', 0), ('observations', 0),
473                 ('entity_seq', 0), ('obs_seq', 0);",
474            )
475            .map_err(sqlite_err)?;
476        }
477
478        conn.execute_batch("PRAGMA optimize;").map_err(sqlite_err)?;
479
480        let seq_entity = read_graph_stat(&conn, "entity_seq").unwrap_or(0);
481        let seq_obs = read_graph_stat(&conn, "obs_seq").unwrap_or(0);
482
483        // Open the reader pool against the now-initialized database. At least one
484        // reader is always created.
485        let pool_size = read_pool_size.max(1);
486        let mut conns = Vec::with_capacity(pool_size);
487        for _ in 0..pool_size {
488            conns.push(Mutex::new(open_reader(path, mmap_size)?));
489        }
490        let readers = ReaderPool {
491            conns,
492            next: AtomicUsize::new(0),
493        };
494
495        Ok(Self {
496            writer: Mutex::new(conn),
497            readers,
498            seq_entity: AtomicI64::new(seq_entity),
499            seq_obs: AtomicI64::new(seq_obs),
500            cache: Mutex::new(LruCache::new(lru_cache_size)),
501        })
502    }
503
504    fn next_entity_id(&self) -> i64 {
505        self.seq_entity.fetch_add(1, Ordering::Relaxed) + 1
506    }
507
508    fn next_obs_id(&self) -> i64 {
509        self.seq_obs.fetch_add(1, Ordering::Relaxed) + 1
510    }
511
512    fn meta_get(&self, name: &str) -> Option<EntityMeta> {
513        self.cache.lock().get(name).copied()
514    }
515
516    fn meta_set(&self, name: &str, m: EntityMeta) {
517        self.cache.lock().put(name.to_string(), m);
518    }
519
520    fn meta_remove(&self, name: &str) {
521        self.cache.lock().pop(name);
522    }
523
524    fn meta_update(&self, name: &str, f: impl FnOnce(&mut EntityMeta)) {
525        let mut cache = self.cache.lock();
526        if let Some(m) = cache.get_mut(name) {
527            f(m);
528        }
529    }
530
531    fn get_entity_id(&self, conn: &Connection, name: &str) -> Result<Option<(i64, i64, i64, i64)>> {
532        if let Some(m) = self.meta_get(name) {
533            return Ok(Some((m.id, m.type_id, m.out_deg, m.in_deg)));
534        }
535        let h = name_hash(name);
536        let mut stmt = conn
537            .prepare_cached(
538                "SELECT id, type_id, obs_count, out_deg, in_deg
539                 FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
540            )
541            .map_err(sqlite_err)?;
542        match stmt.query_row(params![h, name], |row| {
543            Ok(EntityMeta {
544                id: row.get(0)?,
545                type_id: row.get(1)?,
546                obs_count: row.get(2)?,
547                out_deg: row.get(3)?,
548                in_deg: row.get(4)?,
549            })
550        }) {
551            Ok(m) => {
552                self.meta_set(name, m);
553                Ok(Some((m.id, m.type_id, m.out_deg, m.in_deg)))
554            }
555            Err(e) if is_not_found(&e) => Ok(None),
556            Err(e) => Err(sqlite_err(e)),
557        }
558    }
559
560    fn sync_seqs(&self, conn: &Connection) -> Result<()> {
561        let seq_e = self.seq_entity.load(Ordering::Relaxed);
562        let seq_o = self.seq_obs.load(Ordering::Relaxed);
563        conn.execute(
564            "UPDATE graph_stat SET value = CASE key WHEN 'entity_seq' THEN ?1 WHEN 'obs_seq' THEN ?2 ELSE value END
565             WHERE key IN ('entity_seq', 'obs_seq')",
566            params![seq_e, seq_o],
567        )
568        .map_err(sqlite_err)?;
569        Ok(())
570    }
571
572    // ── Public API ──────────────────────────────────────────────────────
573
574    pub fn get_entity(&self, name: &str) -> Result<Option<Entity>> {
575        if name.is_empty() {
576            return Ok(None);
577        }
578
579        if let Some(m) = self.meta_get(name) {
580            let conn = self.readers.get();
581            let etype = name_of_type(&conn, m.type_id).unwrap_or_default();
582            let observations = load_observations_opt(&conn, m.id);
583            return Ok(Some(Entity {
584                name: name.to_string(),
585                entity_type: etype,
586                observations,
587            }));
588        }
589
590        let conn = self.readers.get();
591        let h = name_hash(name);
592        let mut stmt = conn
593            .prepare_cached(
594                "SELECT e.id, e.type_id, e.name, t.name,
595                        e.obs_count, e.out_deg, e.in_deg
596                 FROM entity e
597                 JOIN type_dict t ON t.id = e.type_id
598                 WHERE e.name_hash = ?1 AND e.name = ?2 AND e.flags = 0",
599            )
600            .map_err(sqlite_err)?;
601        match stmt.query_row(params![h, name], |row| {
602            let id: i64 = row.get(0)?;
603            let type_id: i64 = row.get(1)?;
604            let ename: String = row.get(2)?;
605            let etype: String = row.get(3)?;
606            let obs_count: i64 = row.get(4)?;
607            let out_deg: i64 = row.get(5)?;
608            let in_deg: i64 = row.get(6)?;
609            Ok((id, type_id, ename, etype, obs_count, out_deg, in_deg))
610        }) {
611            Ok((id, type_id, ename, etype, obs_count, out_deg, in_deg)) => {
612                let observations = load_observations_opt(&conn, id);
613                drop(stmt);
614                drop(conn);
615                self.meta_set(&ename, EntityMeta { id, type_id, obs_count, out_deg, in_deg });
616                Ok(Some(Entity {
617                    name: ename,
618                    entity_type: etype,
619                    observations,
620                }))
621            }
622            Err(e) if is_not_found(&e) => Ok(None),
623            Err(e) => Err(sqlite_err(e)),
624        }
625    }
626
627    pub fn create_entities(&self, entities: &[Entity]) -> Result<Vec<Entity>> {
628        let conn = self.writer.lock();
629        let tx = TxGuard::begin(&conn)?;
630
631        let mut ins_ent = conn
632            .prepare_cached(
633                "INSERT INTO entity (id, name_hash, name, type_id, obs_count, out_deg, in_deg, created_us, updated_us, flags)
634                 SELECT ?1, ?2, ?3, ?4, ?5, 0, 0, ?6, ?6, 0
635                 WHERE NOT EXISTS (SELECT 1 FROM entity WHERE name_hash = ?2 AND name = ?3 AND flags = 0)",
636            )
637            .map_err(sqlite_err)?;
638
639        let mut ins_fts = conn
640            .prepare_cached("INSERT INTO name_fts (rowid, name) VALUES (?1, ?2)")
641            .map_err(sqlite_err)?;
642
643        let batch_ts = now_us();
644        let mut type_cache: FxHashMap<String, i64> = FxHashMap::default();
645        let mut type_deltas: FxHashMap<i64, i64> = FxHashMap::default();
646        let mut total_entities: i64 = 0;
647        let mut total_obs: i64 = 0;
648        let mut created = Vec::new();
649        let mut created_metas: Vec<(String, EntityMeta)> = Vec::new();
650        let mut obs_sql = String::new();
651
652        for entity in entities {
653            if entity.name.is_empty() {
654                continue;
655            }
656            let h = name_hash(&entity.name);
657            let id = self.next_entity_id();
658            let type_id = match type_cache.get(entity.entity_type.as_str()) {
659                Some(t) => *t,
660                None => {
661                    let t = get_type_id(&conn, &entity.entity_type, 0)?;
662                    type_cache.insert(entity.entity_type.clone(), t);
663                    t
664                }
665            };
666            let obs_count = entity.observations.len() as i64;
667
668            let changed = ins_ent
669                .execute(params![id, h, entity.name, type_id, obs_count, batch_ts])
670                .map_err(sqlite_err)?;
671            if changed == 0 {
672                continue;
673            }
674
675            let n = entity.observations.len();
676            if n > 0 {
677                obs_sql.clear();
678
679                let mut oids = Vec::with_capacity(n);
680                let mut idxs = Vec::with_capacity(n);
681                for _ in 0..n {
682                    oids.push(self.next_obs_id());
683                }
684                for i in 0..n as i64 {
685                    idxs.push(i);
686                }
687
688                obs_sql.push_str("INSERT INTO observation (id,entity_id,idx,body,created_us) VALUES");
689                for i in 0..n {
690                    if i > 0 { obs_sql.push(','); }
691                    obs_sql.push_str("(?,?,?,?,?)");
692                }
693
694                let mut obs_params: Vec<&dyn ToSql> = Vec::with_capacity(n * 5);
695                for (i, obs) in entity.observations.iter().enumerate() {
696                    obs_params.push(&oids[i]);
697                    obs_params.push(&id);
698                    obs_params.push(&idxs[i]);
699                    obs_params.push(obs);
700                    obs_params.push(&batch_ts);
701                }
702
703                conn.execute(&obs_sql, obs_params.as_slice())
704                    .map_err(sqlite_err)?;
705            }
706
707            ins_fts
708                .execute(params![id, entity.name])
709                .map_err(sqlite_err)?;
710
711            *type_deltas.entry(type_id).or_insert(0) += 1;
712            total_entities += 1;
713            total_obs += obs_count;
714
715            created.push(entity.clone());
716            created_metas.push((entity.name.clone(), EntityMeta {
717                id,
718                type_id,
719                obs_count,
720                out_deg: 0,
721                in_deg: 0,
722            }));
723        }
724
725        if total_entities > 0 {
726            for (type_id, delta) in &type_deltas {
727                inc_type_count(&conn, *type_id, *delta)?;
728            }
729            inc_graph_stat(&conn, "entities", total_entities)?;
730            inc_graph_stat(&conn, "observations", total_obs)?;
731            self.sync_seqs(&conn)?;
732        }
733
734        tx.commit()?;
735
736        // Note: `PRAGMA optimize` is intentionally *not* run here. It analyzes
737        // indexes and writes to internal stat tables — pure overhead on the
738        // write hot path. The periodic `run_maintenance` task covers it.
739
740        if !created_metas.is_empty() {
741            let mut cache = self.cache.lock();
742            for (name, meta) in &created_metas {
743                cache.put(name.clone(), *meta);
744            }
745        }
746
747        Ok(created)
748    }
749
750    pub fn delete_entities(&self, names: &[String]) -> Result<()> {
751        if names.is_empty() {
752            return Ok(());
753        }
754        let conn = self.writer.lock();
755
756        // Phase 1: Resolve all names to (id, type_id).
757        let mut resolved: Vec<(i64, i64, String)> = Vec::with_capacity(names.len());
758        let mut sel = conn
759            .prepare_cached(
760                "SELECT id, type_id FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
761            )
762            .map_err(sqlite_err)?;
763        for name in names {
764            let h = name_hash(name);
765            let (id, type_id) = match sel.query_row(params![h, name], |row| {
766                Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?))
767            }) {
768                Ok(v) => v,
769                Err(e) if is_not_found(&e) => continue,
770                Err(e) => return Err(sqlite_err(e)),
771            };
772            resolved.push((id, type_id, name.clone()));
773        }
774
775        if resolved.is_empty() {
776            return Ok(());
777        }
778
779        let ids: Vec<i64> = resolved.iter().map(|(id, _, _)| *id).collect();
780        let n = ids.len();
781
782        // Phase 2: Batch DELETE observations.
783        let obs_p: Vec<String> = (0..n).map(|i| format!("?{}", i + 1)).collect();
784        let obs_sql = format!(
785            "DELETE FROM observation WHERE entity_id IN ({})",
786            obs_p.join(",")
787        );
788        let obs_refs: Vec<&dyn ToSql> = ids.iter().map(|id| id as &dyn ToSql).collect();
789        let obs_deleted = conn
790            .execute(&obs_sql, obs_refs.as_slice())
791            .map_err(sqlite_err)? as i64;
792
793        // Phase 3: Batch DELETE relations.
794        let rel_sql = format!(
795            "DELETE FROM relation WHERE from_id IN ({}) OR to_id IN ({})",
796            obs_p.join(","),
797            obs_p.join(",")
798        );
799        let rel_refs: Vec<&dyn ToSql> = ids.iter().map(|id| id as &dyn ToSql).collect();
800        let rel_deleted = conn
801            .execute(&rel_sql, rel_refs.as_slice())
802            .map_err(sqlite_err)? as i64;
803
804        // Phase 4: Batch FTS deletes.
805        let fts_values: Vec<String> = (0..n)
806            .map(|_| "('delete', ?, '')".to_string())
807            .collect();
808        let fts_sql = format!(
809            "INSERT INTO name_fts(name_fts, rowid, name) VALUES {}",
810            fts_values.join(", ")
811        );
812        conn.execute(&fts_sql, rusqlite::params_from_iter(&ids))
813            .map_err(sqlite_err)?;
814
815        // Aggregate type count deltas.
816        let mut type_deltas: FxHashMap<i64, i64> = FxHashMap::default();
817        for &(_, type_id, _) in &resolved {
818            *type_deltas.entry(type_id).or_insert(0) += 1;
819        }
820
821        // Phase 5: Batch type count decrements.
822        if !type_deltas.is_empty() {
823            let m = type_deltas.len();
824            let type_keys: Vec<i64> = type_deltas.keys().cloned().collect();
825            let type_vals: Vec<i64> = type_deltas.values().map(|v| -*v).collect();
826            let mut case_parts: Vec<String> = Vec::with_capacity(m);
827            let mut id_parts: Vec<String> = Vec::with_capacity(m);
828            for i in 0..m {
829                case_parts.push(format!("WHEN ?{} THEN ?{}", i + 1, m + i + 1));
830                id_parts.push(format!("?{}", i + 1));
831            }
832            let sql = format!(
833                "UPDATE type_dict SET count = MAX(0, count + CASE id {} ELSE 0 END) WHERE id IN ({})",
834                case_parts.join(" "),
835                id_parts.join(","),
836            );
837            let mut params: Vec<Box<dyn ToSql>> = Vec::with_capacity(2 * m);
838            for id in &type_keys {
839                params.push(Box::new(*id));
840            }
841            for delta in &type_vals {
842                params.push(Box::new(*delta));
843            }
844            let param_refs: Vec<&dyn ToSql> = params.iter().map(|p| p.as_ref()).collect();
845            conn.execute(&sql, param_refs.as_slice()).map_err(sqlite_err)?;
846        }
847
848        // Phase 6: Batch DELETE entities.
849        conn.execute(
850            &format!("DELETE FROM entity WHERE id IN ({})", obs_p.join(",")),
851            ids.iter().map(|id| id as &dyn ToSql).collect::<Vec<_>>().as_slice(),
852        )
853        .map_err(sqlite_err)?;
854
855        // Phase 7: Update stats.
856        inc_graph_stat(&conn, "entities", -(n as i64))?;
857        inc_graph_stat(&conn, "observations", -obs_deleted)?;
858        inc_graph_stat(&conn, "relations", -rel_deleted)?;
859
860        // Phase 8: Remove from cache.
861        for (_, _, name) in &resolved {
862            self.meta_remove(name);
863        }
864
865        Ok(())
866    }
867
868    pub fn create_relations(&self, relations: &[Relation]) -> Result<Vec<Relation>> {
869        let conn = self.writer.lock();
870        let tx = TxGuard::begin(&conn)?;
871
872        let mut ins = conn
873            .prepare_cached(
874                "INSERT INTO relation (from_id, to_id, type_id, created_us)
875                 SELECT ?1, ?2, ?3, ?4
876                 WHERE NOT EXISTS (SELECT 1 FROM relation WHERE from_id = ?1 AND to_id = ?2 AND type_id = ?3)",
877            )
878            .map_err(sqlite_err)?;
879
880        let ts = now_us();
881        let mut type_cache: FxHashMap<String, i64> = FxHashMap::default();
882        let mut type_deltas: FxHashMap<i64, i64> = FxHashMap::default();
883        let mut out_deltas: FxHashMap<i64, i64> = FxHashMap::default();
884        let mut in_deltas: FxHashMap<i64, i64> = FxHashMap::default();
885        let mut total_relations: i64 = 0;
886        let mut created = Vec::new();
887
888        for rel in relations {
889            let (from_id, _, _, _) = match self.get_entity_id(&conn, &rel.from)? {
890                Some(v) => v,
891                None => continue,
892            };
893            let (to_id, _, _, _) = match self.get_entity_id(&conn, &rel.to)? {
894                Some(v) => v,
895                None => continue,
896            };
897            let type_id = match type_cache.get(rel.relation_type.as_str()) {
898                Some(t) => *t,
899                None => {
900                    let t = get_type_id(&conn, &rel.relation_type, 1)?;
901                    type_cache.insert(rel.relation_type.clone(), t);
902                    t
903                }
904            };
905
906            let changed = ins
907                .execute(params![from_id, to_id, type_id, ts])
908                .map_err(sqlite_err)?;
909            if changed == 0 {
910                continue;
911            }
912
913            *out_deltas.entry(from_id).or_insert(0) += 1;
914            *in_deltas.entry(to_id).or_insert(0) += 1;
915            *type_deltas.entry(type_id).or_insert(0) += 1;
916            total_relations += 1;
917
918            created.push(rel.clone());
919        }
920
921        if total_relations > 0 {
922            for (id, delta) in &out_deltas {
923                conn.execute(
924                    "UPDATE entity SET out_deg = out_deg + ?1 WHERE id = ?2",
925                    params![delta, id],
926                )
927                .map_err(sqlite_err)?;
928            }
929            for (id, delta) in in_deltas {
930                conn.execute(
931                    "UPDATE entity SET in_deg = in_deg + ?1 WHERE id = ?2",
932                    params![delta, id],
933                )
934                .map_err(sqlite_err)?;
935            }
936            for (type_id, delta) in &type_deltas {
937                inc_type_count(&conn, *type_id, *delta)?;
938            }
939            inc_graph_stat(&conn, "relations", total_relations)?;
940        }
941
942        tx.commit()?;
943
944        // See `create_entities`: `PRAGMA optimize` is deferred to maintenance.
945
946        if !created.is_empty() {
947            let mut cache = self.cache.lock();
948            for rel in &created {
949                if let Some(m) = cache.get_mut(&rel.from) {
950                    m.out_deg += 1;
951                }
952                if let Some(m) = cache.get_mut(&rel.to) {
953                    m.in_deg += 1;
954                }
955            }
956        }
957
958        Ok(created)
959    }
960
961    pub fn delete_relations(&self, relations: &[Relation]) -> Result<()> {
962        if relations.is_empty() {
963            return Ok(());
964        }
965        let conn = self.writer.lock();
966
967        // Resolve names to IDs and collect valid triples.
968        let mut triples: Vec<(i64, i64, i64)> = Vec::with_capacity(relations.len());
969        let mut names: Vec<(String, String)> = Vec::with_capacity(relations.len());
970        for rel in relations {
971            let (from_id, _, _, _) = match self.get_entity_id(&conn, &rel.from)? {
972                Some(v) => v,
973                None => continue,
974            };
975            let (to_id, _, _, _) = match self.get_entity_id(&conn, &rel.to)? {
976                Some(v) => v,
977                None => continue,
978            };
979            let type_id = match get_type_id(&conn, &rel.relation_type, 1) {
980                Ok(id) => id,
981                Err(_) => continue,
982            };
983            triples.push((from_id, to_id, type_id));
984            names.push((rel.from.clone(), rel.to.clone()));
985        }
986
987        if triples.is_empty() {
988            return Ok(());
989        }
990
991        // Batch DELETE using VALUES subquery.
992        let mut sql = String::from(
993            "DELETE FROM relation WHERE (from_id, to_id, type_id) IN (",
994        );
995        for (i, _) in triples.iter().enumerate() {
996            if i > 0 {
997                sql.push_str(", ");
998            }
999            let base = (i * 3) + 1;
1000            sql.push_str(&format!("SELECT ?{b}, ?{bp1}, ?{bp2}", b = base, bp1 = base + 1, bp2 = base + 2));
1001        }
1002        sql.push(')');
1003
1004        let mut param_values: Vec<Box<dyn ToSql>> = Vec::with_capacity(triples.len() * 3);
1005        for &(f, t, tp) in &triples {
1006            param_values.push(Box::new(f));
1007            param_values.push(Box::new(t));
1008            param_values.push(Box::new(tp));
1009        }
1010        let param_refs: Vec<&dyn ToSql> = param_values.iter().map(|p| p.as_ref()).collect();
1011        let total = conn.execute(&sql, param_refs.as_slice()).map_err(sqlite_err)?;
1012        if total == 0 {
1013            return Ok(());
1014        }
1015
1016        // Aggregate degree and type deltas.
1017        let mut out_deltas: FxHashMap<i64, i64> = FxHashMap::default();
1018        let mut in_deltas: FxHashMap<i64, i64> = FxHashMap::default();
1019        let mut type_deltas: FxHashMap<i64, i64> = FxHashMap::default();
1020        for &(from_id, to_id, type_id) in &triples {
1021            *out_deltas.entry(from_id).or_insert(0) += 1;
1022            *in_deltas.entry(to_id).or_insert(0) += 1;
1023            *type_deltas.entry(type_id).or_insert(0) += 1;
1024        }
1025
1026        // Batch out_deg updates.
1027        let out_keys: Vec<i64> = out_deltas.keys().cloned().collect();
1028        let out_vals: Vec<i64> = out_deltas.values().cloned().collect();
1029        if !out_keys.is_empty() {
1030            let m = out_keys.len();
1031            let mut case_parts: Vec<String> = Vec::with_capacity(m);
1032            let mut id_parts: Vec<String> = Vec::with_capacity(m);
1033            for i in 0..m {
1034                case_parts.push(format!("WHEN ?{} THEN ?{}", i + 1, m + i + 1));
1035                id_parts.push(format!("?{}", i + 1));
1036            }
1037            let sql = format!(
1038                "UPDATE entity SET out_deg = MAX(0, out_deg - CASE id {} ELSE 0 END) WHERE id IN ({})",
1039                case_parts.join(" "),
1040                id_parts.join(","),
1041            );
1042            let mut params: Vec<Box<dyn ToSql>> = Vec::with_capacity(2 * m);
1043            for id in &out_keys {
1044                params.push(Box::new(*id));
1045            }
1046            for delta in &out_vals {
1047                params.push(Box::new(*delta));
1048            }
1049            let param_refs: Vec<&dyn ToSql> = params.iter().map(|p| p.as_ref()).collect();
1050            conn.execute(&sql, param_refs.as_slice()).map_err(sqlite_err)?;
1051        }
1052
1053        // Batch in_deg updates.
1054        let in_keys: Vec<i64> = in_deltas.keys().cloned().collect();
1055        let in_vals: Vec<i64> = in_deltas.values().cloned().collect();
1056        if !in_keys.is_empty() {
1057            let m = in_keys.len();
1058            let mut case_parts: Vec<String> = Vec::with_capacity(m);
1059            let mut id_parts: Vec<String> = Vec::with_capacity(m);
1060            for i in 0..m {
1061                case_parts.push(format!("WHEN ?{} THEN ?{}", i + 1, m + i + 1));
1062                id_parts.push(format!("?{}", i + 1));
1063            }
1064            let sql = format!(
1065                "UPDATE entity SET in_deg = MAX(0, in_deg - CASE id {} ELSE 0 END) WHERE id IN ({})",
1066                case_parts.join(" "),
1067                id_parts.join(","),
1068            );
1069            let mut params: Vec<Box<dyn ToSql>> = Vec::with_capacity(2 * m);
1070            for id in &in_keys {
1071                params.push(Box::new(*id));
1072            }
1073            for delta in &in_vals {
1074                params.push(Box::new(*delta));
1075            }
1076            let param_refs: Vec<&dyn ToSql> = params.iter().map(|p| p.as_ref()).collect();
1077            conn.execute(&sql, param_refs.as_slice()).map_err(sqlite_err)?;
1078        }
1079
1080        // Batch type_dict updates.
1081        let type_keys: Vec<i64> = type_deltas.keys().cloned().collect();
1082        let type_vals: Vec<i64> = type_deltas.values().cloned().collect();
1083        if !type_keys.is_empty() {
1084            let m = type_keys.len();
1085            let mut case_parts: Vec<String> = Vec::with_capacity(m);
1086            let mut id_parts: Vec<String> = Vec::with_capacity(m);
1087            for i in 0..m {
1088                case_parts.push(format!("WHEN ?{} THEN ?{}", i + 1, m + i + 1));
1089                id_parts.push(format!("?{}", i + 1));
1090            }
1091            let sql = format!(
1092                "UPDATE type_dict SET count = MAX(0, count - CASE id {} ELSE 0 END) WHERE id IN ({})",
1093                case_parts.join(" "),
1094                id_parts.join(","),
1095            );
1096            let mut params: Vec<Box<dyn ToSql>> = Vec::with_capacity(2 * m);
1097            for id in &type_keys {
1098                params.push(Box::new(*id));
1099            }
1100            for delta in &type_vals {
1101                params.push(Box::new(*delta));
1102            }
1103            let param_refs: Vec<&dyn ToSql> = params.iter().map(|p| p.as_ref()).collect();
1104            conn.execute(&sql, param_refs.as_slice()).map_err(sqlite_err)?;
1105        }
1106
1107        inc_graph_stat(&conn, "relations", -(total as i64))?;
1108
1109        // Update cache for resolved triples (self-heals on next reload if
1110        // a triple happened to not match).
1111        for (from, to) in &names {
1112            self.meta_update(from, |m| m.out_deg = m.out_deg.saturating_sub(1));
1113            self.meta_update(to, |m| m.in_deg = m.in_deg.saturating_sub(1));
1114        }
1115
1116        Ok(())
1117    }
1118
1119    pub fn add_observations(&self, entity_name: &str, contents: &[String]) -> Result<Vec<String>> {
1120        let conn = self.writer.lock();
1121        let (id, _type_id, _, _) = match self.get_entity_id(&conn, entity_name)? {
1122            Some(v) => v,
1123            None => {
1124                return Err(MCSError::InvalidParams(format!(
1125                    "Entity '{entity_name}' not found"
1126                )))
1127            }
1128        };
1129
1130        let mut max_idx: i64 = conn
1131            .query_row(
1132                "SELECT COALESCE(MAX(idx), -1) FROM observation WHERE entity_id = ?1",
1133                params![id],
1134                |row| row.get(0),
1135            )
1136            .map_err(sqlite_err)?;
1137
1138        let ts = now_us();
1139        let mut ins_obs = conn
1140            .prepare_cached(
1141                "INSERT INTO observation (id, entity_id, idx, body, created_us) VALUES (?1, ?2, ?3, ?4, ?5)",
1142            )
1143            .map_err(sqlite_err)?;
1144
1145        for content in contents {
1146            max_idx += 1;
1147            let oid = self.next_obs_id();
1148            ins_obs
1149                .execute(params![oid, id, max_idx, content, ts])
1150                .map_err(sqlite_err)?;
1151        }
1152        let added = contents.to_vec();
1153
1154        let count: i64 = contents.len() as i64;
1155        conn.execute(
1156            "UPDATE entity SET obs_count = obs_count + ?1, updated_us = ?2 WHERE id = ?3",
1157            params![count, ts, id],
1158        )
1159        .map_err(sqlite_err)?;
1160
1161        inc_graph_stat(&conn, "observations", count)?;
1162        self.sync_seqs(&conn)?;
1163
1164        self.meta_update(entity_name, |m| m.obs_count += count);
1165
1166        Ok(added)
1167    }
1168
1169    pub fn delete_observations(&self, entity_name: &str, observations: &[String]) -> Result<()> {
1170        if observations.is_empty() {
1171            return Ok(());
1172        }
1173        let conn = self.writer.lock();
1174        let (id, _, _, _) = match self.get_entity_id(&conn, entity_name)? {
1175            Some(v) => v,
1176            None => {
1177                return Err(MCSError::InvalidParams(format!(
1178                    "Entity '{entity_name}' not found"
1179                )))
1180            }
1181        };
1182
1183        let placeholders: Vec<String> = (0..observations.len())
1184            .map(|i| format!("?{}", i + 2))
1185            .collect();
1186        let sql = format!(
1187            "DELETE FROM observation WHERE entity_id = ?1 AND body IN ({})",
1188            placeholders.join(",")
1189        );
1190
1191        let mut param_values: Vec<Box<dyn ToSql>> = Vec::with_capacity(1 + observations.len());
1192        param_values.push(Box::new(id));
1193        for obs in observations {
1194            param_values.push(Box::new(obs.as_str()));
1195        }
1196        let param_refs: Vec<&dyn ToSql> = param_values.iter().map(|p| p.as_ref()).collect();
1197        let removed = conn.execute(&sql, param_refs.as_slice()).map_err(sqlite_err)? as i64;
1198
1199        if removed > 0 {
1200            conn.execute(
1201                "UPDATE entity SET obs_count = MAX(0, obs_count - ?1), updated_us = ?2 WHERE id = ?3",
1202                params![removed, now_us(), id],
1203            )
1204            .map_err(sqlite_err)?;
1205            inc_graph_stat(&conn, "observations", -removed)?;
1206
1207            self.meta_update(entity_name, |m| m.obs_count = m.obs_count.saturating_sub(removed));
1208        }
1209
1210        Ok(())
1211    }
1212
1213    pub fn upsert_entities(&self, entities: &[Entity]) -> Result<Vec<Entity>> {
1214        let mut results = Vec::new();
1215        for entity in entities {
1216            if let Some(existing) = self.get_entity(&entity.name)? {
1217                // Update type if different.
1218                if existing.entity_type != entity.entity_type {
1219                    let conn = self.writer.lock();
1220                    let old_type_id = conn
1221                        .query_row(
1222                            "SELECT type_id FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
1223                            params![name_hash(&entity.name), entity.name],
1224                            |row| row.get::<_, i64>(0),
1225                        )
1226                        .map_err(sqlite_err)?;
1227                    let new_type_id = get_type_id(&conn, &entity.entity_type, 0)?;
1228                    inc_type_count(&conn, old_type_id, -1)?;
1229                    inc_type_count(&conn, new_type_id, 1)?;
1230                    conn.execute(
1231                        "UPDATE entity SET type_id = ?1, updated_us = ?2 WHERE name_hash = ?3 AND name = ?4",
1232                        params![new_type_id, now_us(), name_hash(&entity.name), entity.name],
1233                    )
1234                    .map_err(sqlite_err)?;
1235                    // Invalidate cache so subsequent get_entity reloads meta.
1236                    self.meta_remove(&entity.name);
1237                }
1238                // Merge observations (append new ones not already present).
1239                let existing_set: HashSet<&str> =
1240                    existing.observations.iter().map(|s| s.as_str()).collect();
1241                let to_add: Vec<String> = entity
1242                    .observations
1243                    .iter()
1244                    .filter(|o| !existing_set.contains(o.as_str()))
1245                    .cloned()
1246                    .collect();
1247                if !to_add.is_empty() {
1248                    self.add_observations(&entity.name, &to_add)?;
1249                }
1250                let updated = self
1251                    .get_entity(&entity.name)?
1252                    .unwrap_or(entity.clone());
1253                results.push(updated);
1254            } else {
1255                let c = self.create_entities(&[entity.clone()])?;
1256                if let Some(e) = c.into_iter().next() {
1257                    results.push(e);
1258                }
1259            }
1260        }
1261        Ok(results)
1262    }
1263
1264    pub fn merge_entities(&self, source: &str, target: &str) -> Result<Entity> {
1265        let conn = self.writer.lock();
1266        let (src_id, _, _, _) = match self.get_entity_id(&conn, source)? {
1267            Some(v) => v,
1268            None => {
1269                return Err(MCSError::InvalidParams(format!(
1270                    "Source entity '{source}' not found"
1271                )))
1272            }
1273        };
1274        let (tgt_id, _, _, _) = match self.get_entity_id(&conn, target)? {
1275            Some(v) => v,
1276            None => {
1277                return Err(MCSError::InvalidParams(format!(
1278                    "Target entity '{target}' not found"
1279                )))
1280            }
1281        };
1282
1283        if src_id == tgt_id {
1284            return self.get_entity(target)?.ok_or_else(|| {
1285                MCSError::InvalidParams("Target entity not found after merge".into())
1286            });
1287        }
1288
1289        // Move observations from source to target.
1290        let mut obs_count: i64 = 0;
1291        {
1292            let mut max_idx: i64 = conn
1293                .query_row(
1294                    "SELECT COALESCE(MAX(idx), -1) FROM observation WHERE entity_id = ?1",
1295                    params![tgt_id],
1296                    |row| row.get(0),
1297                )
1298                .map_err(sqlite_err)?;
1299            let mut sel_obs = conn
1300                .prepare_cached(
1301                    "SELECT id, body FROM observation WHERE entity_id = ?1 ORDER BY idx",
1302                )
1303                .map_err(sqlite_err)?;
1304            let mut upd_obs = conn
1305                .prepare_cached("UPDATE observation SET entity_id = ?1, idx = ?2 WHERE id = ?3")
1306                .map_err(sqlite_err)?;
1307            let rows: Vec<(i64, String)> = sel_obs
1308                .query_map(params![src_id], |row| {
1309                    Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?))
1310                })
1311                .map_err(sqlite_err)?
1312                .filter_map(|r| r.ok())
1313                .collect();
1314            for (oid, _body) in &rows {
1315                max_idx += 1;
1316                upd_obs
1317                    .execute(params![tgt_id, max_idx, oid])
1318                    .map_err(sqlite_err)?;
1319                obs_count += 1;
1320            }
1321        }
1322
1323        // Move relations from source to target.
1324        conn.execute(
1325            "UPDATE OR IGNORE relation SET from_id = ?1 WHERE from_id = ?2",
1326            params![tgt_id, src_id],
1327        )
1328        .map_err(sqlite_err)?;
1329        conn.execute(
1330            "UPDATE OR IGNORE relation SET to_id = ?1 WHERE to_id = ?2",
1331            params![tgt_id, src_id],
1332        )
1333        .map_err(sqlite_err)?;
1334        // Delete orphaned relations that were updated by the above (the "OR IGNORE"
1335        // keeps the first, but we still have the original row with the old id? No —
1336        // UPDATE OR IGNORE won't remove. So we must delete any that still reference src_id.)
1337        conn.execute("DELETE FROM relation WHERE from_id = ?1", params![src_id])
1338            .map_err(sqlite_err)?;
1339        conn.execute("DELETE FROM relation WHERE to_id = ?1", params![src_id])
1340            .map_err(sqlite_err)?;
1341
1342        // Update degrees on target.
1343        let out_add: i64 = conn
1344            .query_row(
1345                "SELECT COUNT(*) FROM relation WHERE from_id = ?1",
1346                params![tgt_id],
1347                |row| row.get(0),
1348            )
1349            .map_err(sqlite_err)?;
1350        let in_add: i64 = conn
1351            .query_row(
1352                "SELECT COUNT(*) FROM relation WHERE to_id = ?1",
1353                params![tgt_id],
1354                |row| row.get(0),
1355            )
1356            .map_err(sqlite_err)?;
1357        conn.execute(
1358            "UPDATE entity SET out_deg = ?1, in_deg = ?2, obs_count = obs_count + ?3, updated_us = ?4 WHERE id = ?5",
1359            params![out_add, in_add, obs_count, now_us(), tgt_id],
1360        )
1361        .map_err(sqlite_err)?;
1362
1363        // Delete source entity.
1364        conn.execute(
1365            "INSERT INTO name_fts(name_fts, rowid, name) VALUES ('delete', ?1, '')",
1366            params![src_id],
1367        )
1368        .map_err(sqlite_err)?;
1369        conn.execute("DELETE FROM entity WHERE id = ?1", params![src_id])
1370            .map_err(sqlite_err)?;
1371
1372        inc_graph_stat(&conn, "entities", -1)?;
1373        self.meta_remove(source);
1374
1375        // Reload target into cache.
1376        if let Ok(meta) = conn.query_row(
1377            "SELECT id, type_id, obs_count, out_deg, in_deg FROM entity WHERE id = ?1",
1378            params![tgt_id],
1379            |row| {
1380                Ok(EntityMeta {
1381                    id: row.get(0)?,
1382                    type_id: row.get(1)?,
1383                    obs_count: row.get(2)?,
1384                    out_deg: row.get(3)?,
1385                    in_deg: row.get(4)?,
1386                })
1387            },
1388        ) {
1389            self.meta_set(target, meta);
1390        }
1391
1392        let (name, etype): (String, String) = conn
1393            .query_row(
1394                "SELECT e.name, t.name FROM entity e JOIN type_dict t ON t.id = e.type_id WHERE e.id = ?1",
1395                params![tgt_id],
1396                |row| Ok((row.get(0)?, row.get(1)?)),
1397            )
1398            .map_err(sqlite_err)?;
1399        let observations = load_observations_opt(&conn, tgt_id);
1400
1401        Ok(Entity {
1402            name,
1403            entity_type: etype,
1404            observations,
1405        })
1406    }
1407
1408    pub fn search_nodes_filtered(
1409        &self,
1410        query: &str,
1411        filter_type: Option<&str>,
1412        offset: usize,
1413        limit: usize,
1414    ) -> Vec<Entity> {
1415        if query.is_empty() {
1416            return Vec::new();
1417        }
1418        let conn = self.readers.get();
1419
1420        // Single pass: collect IDs from name_fts then obs_fts, deduping with a set.
1421        let mut entity_ids: Vec<i64> = Vec::new();
1422        let mut seen: HashSet<i64> = HashSet::new();
1423
1424        if let Ok(mut stmt) = conn.prepare(
1425            "SELECT rowid FROM name_fts WHERE name_fts MATCH ?1 ORDER BY rank LIMIT ?2",
1426        ) {
1427            let limit_i64 = (limit + offset) as i64;
1428            if let Ok(rows) = stmt.query_map(params![query, limit_i64], |row| {
1429                row.get::<_, i64>(0)
1430            }) {
1431                for row in rows.flatten() {
1432                    if seen.insert(row) {
1433                        entity_ids.push(row);
1434                    }
1435                }
1436            }
1437        }
1438
1439        if let Ok(mut stmt) = conn.prepare(
1440            "SELECT entity_id FROM obs_fts JOIN observation ON obs_fts.rowid = observation.id
1441             WHERE obs_fts MATCH ?1
1442             GROUP BY entity_id
1443             LIMIT ?2",
1444        ) {
1445            let limit_i64 = (limit + offset) as i64;
1446            if let Ok(rows) = stmt.query_map(params![query, limit_i64], |row| {
1447                row.get::<_, i64>(0)
1448            }) {
1449                for row in rows.flatten() {
1450                    if seen.insert(row) {
1451                        entity_ids.push(row);
1452                    }
1453                }
1454            }
1455        }
1456
1457        // Apply filter_type, offset, limit.
1458        let mut results = Vec::new();
1459        let mut count: usize = 0;
1460        for eid in entity_ids {
1461            if let Ok(entity) = entity_by_id(&conn, eid) {
1462                if let Some(ft) = filter_type {
1463                    if !ft.is_empty() && entity.entity_type != ft {
1464                        continue;
1465                    }
1466                }
1467                if count < offset {
1468                    count += 1;
1469                    continue;
1470                }
1471                if results.len() >= limit {
1472                    break;
1473                }
1474                results.push(entity);
1475                count += 1;
1476            }
1477        }
1478
1479        results
1480    }
1481
1482    pub fn read_graph_filtered(
1483        &self,
1484        filter_type: Option<&str>,
1485        offset: usize,
1486        limit: usize,
1487    ) -> Result<String> {
1488        let conn = self.readers.get();
1489
1490        let limit_sql: i64 = if limit == usize::MAX {
1491            -1
1492        } else {
1493            limit.min(i64::MAX as usize) as i64
1494        };
1495        let offset_sql: i64 = offset as i64;
1496
1497        // Resolve the requested page of entity ids first. Relations are then
1498        // scoped to edges whose *both* endpoints fall inside this page, which
1499        // keeps the response self-consistent (no dangling references to
1500        // entities that were paged out) and bounds the relation payload by the
1501        // page size instead of dumping every relation in the graph.
1502        let filter = filter_type.filter(|ft| !ft.is_empty());
1503        let ids: Vec<i64> = if let Some(ft) = filter {
1504            let mut stmt = conn
1505                .prepare_cached(
1506                    "SELECT e.id FROM entity e
1507                     WHERE e.type_id = (SELECT id FROM type_dict WHERE kind = 0 AND name = ?1)
1508                       AND e.flags = 0
1509                     ORDER BY e.id LIMIT ?2 OFFSET ?3",
1510                )
1511                .map_err(sqlite_err)?;
1512            stmt.query_map(params![ft, limit_sql, offset_sql], |r| r.get::<_, i64>(0))
1513                .map_err(sqlite_err)?
1514                .filter_map(|r| r.ok())
1515                .collect()
1516        } else {
1517            let mut stmt = conn
1518                .prepare_cached(
1519                    "SELECT e.id FROM entity e WHERE e.flags = 0
1520                     ORDER BY e.id LIMIT ?1 OFFSET ?2",
1521                )
1522                .map_err(sqlite_err)?;
1523            stmt.query_map(params![limit_sql, offset_sql], |r| r.get::<_, i64>(0))
1524                .map_err(sqlite_err)?
1525                .filter_map(|r| r.ok())
1526                .collect()
1527        };
1528
1529        if ids.is_empty() {
1530            return Ok(r#"{"entities":[],"relations":[]}"#.to_string());
1531        }
1532
1533        let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1534
1535        let entities_json: String = {
1536            let sql = format!(
1537                "SELECT COALESCE(json_group_array(json_object(
1538                    'name', e.name,
1539                    'entityType', t.name,
1540                    'observations', COALESCE((
1541                        SELECT json_group_array(o.body ORDER BY o.idx)
1542                        FROM observation o WHERE o.entity_id = e.id
1543                    ), json('[]'))
1544                ) ORDER BY e.id), json('[]'))
1545                FROM entity e
1546                JOIN type_dict t ON t.id = e.type_id
1547                WHERE e.id IN ({placeholders}) AND e.flags = 0"
1548            );
1549            conn.query_row(&sql, rusqlite::params_from_iter(&ids), |row| {
1550                row.get::<_, String>(0)
1551            })
1552            .map_err(sqlite_err)?
1553        };
1554
1555        let relations_json: String = {
1556            let sql = format!(
1557                "SELECT COALESCE(json_group_array(json_object(
1558                    'from', e1.name,
1559                    'to', e2.name,
1560                    'relationType', t.name
1561                )), json('[]'))
1562                FROM relation r
1563                JOIN entity e1 ON e1.id = r.from_id
1564                JOIN entity e2 ON e2.id = r.to_id
1565                JOIN type_dict t ON t.id = r.type_id
1566                WHERE r.from_id IN ({placeholders}) AND r.to_id IN ({placeholders})
1567                  AND e1.flags = 0 AND e2.flags = 0"
1568            );
1569            let all_params: Vec<&dyn ToSql> = ids
1570                .iter()
1571                .map(|id| id as &dyn ToSql)
1572                .chain(ids.iter().map(|id| id as &dyn ToSql))
1573                .collect();
1574            conn.query_row(&sql, all_params.as_slice(), |row| row.get::<_, String>(0))
1575                .map_err(sqlite_err)?
1576        };
1577
1578        let mut out = String::with_capacity(32 + entities_json.len() + relations_json.len());
1579        out.push_str("{\"entities\":");
1580        out.push_str(&entities_json);
1581        out.push_str(",\"relations\":");
1582        out.push_str(&relations_json);
1583        out.push('}');
1584        Ok(out)
1585    }
1586
1587    pub fn open_nodes(&self, names: &[String]) -> String {
1588        let conn = self.readers.get();
1589        let mut entity_ids: Vec<i64> = Vec::new();
1590
1591        for name in names {
1592            let h = name_hash(name);
1593            if let Ok(Some(id)) = conn
1594                .query_row(
1595                    "SELECT id FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
1596                    params![h, name],
1597                    |row| row.get::<_, i64>(0),
1598                )
1599                .map(Some)
1600                .or_else(|e| if is_not_found(&e) { Ok(None) } else { Err(sqlite_err(e)) })
1601            {
1602                entity_ids.push(id);
1603            }
1604        }
1605
1606        if entity_ids.is_empty() {
1607            return r#"{"entities":[],"relations":[]}"#.to_string();
1608        }
1609
1610        let placeholders: Vec<String> = entity_ids.iter().map(|_| "?".to_string()).collect();
1611        let ids_str = placeholders.join(",");
1612
1613        let entities_json: String = {
1614            let sql = format!(
1615                "SELECT COALESCE(json_group_array(json_object(
1616                    'name', e.name,
1617                    'entityType', t.name,
1618                    'observations', COALESCE((
1619                        SELECT json_group_array(o.body ORDER BY o.idx)
1620                        FROM observation o WHERE o.entity_id = e.id
1621                    ), json('[]'))
1622                ) ORDER BY e.id), json('[]'))
1623                FROM entity e
1624                JOIN type_dict t ON t.id = e.type_id
1625                WHERE e.id IN ({ids_str}) AND e.flags = 0"
1626            );
1627            conn.query_row(&sql, rusqlite::params_from_iter(&entity_ids), |row| {
1628                row.get::<_, String>(0)
1629            })
1630            .unwrap_or_else(|_| "[]".to_string())
1631        };
1632
1633        let relations_json: String = {
1634            let sql = format!(
1635                "SELECT COALESCE(json_group_array(json_object(
1636                    'from', e1.name,
1637                    'to', e2.name,
1638                    'relationType', t.name
1639                )), json('[]'))
1640                FROM relation r
1641                JOIN entity e1 ON e1.id = r.from_id
1642                JOIN entity e2 ON e2.id = r.to_id
1643                JOIN type_dict t ON t.id = r.type_id
1644                WHERE (r.from_id IN ({ids_str}) OR r.to_id IN ({ids_str}))
1645                  AND e1.flags = 0 AND e2.flags = 0"
1646            );
1647            let all_params: Vec<&dyn rusqlite::types::ToSql> = entity_ids
1648                .iter()
1649                .map(|id| id as &dyn rusqlite::types::ToSql)
1650                .chain(entity_ids.iter().map(|id| id as &dyn rusqlite::types::ToSql))
1651                .collect();
1652            let mut stmt = conn.prepare(&sql).unwrap();
1653            stmt.query_row(all_params.as_slice(), |row| row.get::<_, String>(0))
1654                .unwrap_or_else(|_| "[]".to_string())
1655        };
1656
1657        let mut out = String::with_capacity(32 + entities_json.len() + relations_json.len());
1658        out.push_str("{\"entities\":");
1659        out.push_str(&entities_json);
1660        out.push_str(",\"relations\":");
1661        out.push_str(&relations_json);
1662        out.push('}');
1663        out
1664    }
1665
1666    pub fn entities_exist(&self, names: &[String]) -> Result<Vec<bool>> {
1667        let conn = self.readers.get();
1668        let mut results = Vec::with_capacity(names.len());
1669        for name in names {
1670            let h = name_hash(name);
1671            let exists: bool = conn
1672                .query_row(
1673                    "SELECT 1 FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
1674                    params![h, name],
1675                    |_| Ok(()),
1676                )
1677                .is_ok();
1678            results.push(exists);
1679        }
1680        Ok(results)
1681    }
1682
1683    pub fn degree(&self, name: &str, direction: Direction) -> Result<usize> {
1684        let conn = self.readers.get();
1685        let (_, _, out_d, in_d) = match self.get_entity_id(&conn, name)? {
1686            Some(v) => v,
1687            None => {
1688                return Err(MCSError::InvalidParams(format!(
1689                    "Entity '{name}' not found"
1690                )))
1691            }
1692        };
1693        Ok(match direction {
1694            Direction::Outgoing => out_d as usize,
1695            Direction::Incoming => in_d as usize,
1696            Direction::Both => (out_d + in_d) as usize,
1697        })
1698    }
1699
1700    pub fn get_entity_count(&self) -> Result<usize> {
1701        let conn = self.readers.get();
1702        read_graph_stat(&conn, "entities")
1703            .map(|v| v as usize)
1704            .map_err(|_| MCSError::MemoryError("Failed to read entity count".into()))
1705    }
1706
1707    pub fn get_relation_count(&self) -> Result<usize> {
1708        let conn = self.readers.get();
1709        read_graph_stat(&conn, "relations")
1710            .map(|v| v as usize)
1711            .map_err(|_| MCSError::MemoryError("Failed to read relation count".into()))
1712    }
1713
1714    pub fn search_relations(
1715        &self,
1716        from: Option<&str>,
1717        to: Option<&str>,
1718        rtype: Option<&str>,
1719    ) -> Vec<Relation> {
1720        let conn = self.readers.get();
1721        let mut results = Vec::new();
1722
1723        // A filter that is supplied but resolves to nothing uses the sentinel
1724        // id -1 (which matches no row), so the query returns empty rather than
1725        // silently dropping the filter and matching every relation. The lookups
1726        // are read-only — `get_type_id` would *insert* a phantom type, which is
1727        // both wrong and impossible on a `query_only` reader connection.
1728        let from_id = from
1729            .filter(|f| !f.is_empty())
1730            .map(|f| entity_name_lookup(&conn, f).ok().flatten().unwrap_or(-1));
1731        let to_id = to
1732            .filter(|t| !t.is_empty())
1733            .map(|t| entity_name_lookup(&conn, t).ok().flatten().unwrap_or(-1));
1734        let type_id = rtype
1735            .filter(|rt| !rt.is_empty())
1736            .map(|rt| lookup_type_id(&conn, rt, 1).unwrap_or(-1));
1737
1738        match (from_id, to_id, type_id) {
1739            (Some(fid), Some(tid), Some(tpid)) => {
1740                if let Ok(mut stmt) = conn.prepare_cached(
1741                    "SELECT e1.name, e2.name, t.name
1742                     FROM relation r
1743                     JOIN entity e1 ON e1.id = r.from_id
1744                     JOIN entity e2 ON e2.id = r.to_id
1745                     JOIN type_dict t ON t.id = r.type_id
1746                     WHERE r.from_id = ?1 AND r.to_id = ?2 AND r.type_id = ?3
1747                       AND e1.flags = 0 AND e2.flags = 0
1748                     ORDER BY r.from_id, r.to_id"
1749                ) {
1750                    if let Ok(rows) = stmt.query_map(params![fid, tid, tpid], |row| {
1751                        Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1752                    }) {
1753                        for row in rows.flatten() { results.push(row); }
1754                    }
1755                }
1756            }
1757            (Some(fid), Some(tid), None) => {
1758                if let Ok(mut stmt) = conn.prepare_cached(
1759                    "SELECT e1.name, e2.name, t.name
1760                     FROM relation r
1761                     JOIN entity e1 ON e1.id = r.from_id
1762                     JOIN entity e2 ON e2.id = r.to_id
1763                     JOIN type_dict t ON t.id = r.type_id
1764                     WHERE r.from_id = ?1 AND r.to_id = ?2
1765                       AND e1.flags = 0 AND e2.flags = 0
1766                     ORDER BY r.from_id, r.to_id"
1767                ) {
1768                    if let Ok(rows) = stmt.query_map(params![fid, tid], |row| {
1769                        Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1770                    }) {
1771                        for row in rows.flatten() { results.push(row); }
1772                    }
1773                }
1774            }
1775            (Some(fid), None, Some(tpid)) => {
1776                if let Ok(mut stmt) = conn.prepare_cached(
1777                    "SELECT e1.name, e2.name, t.name
1778                     FROM relation r
1779                     JOIN entity e1 ON e1.id = r.from_id
1780                     JOIN entity e2 ON e2.id = r.to_id
1781                     JOIN type_dict t ON t.id = r.type_id
1782                     WHERE r.from_id = ?1 AND r.type_id = ?2
1783                       AND e1.flags = 0 AND e2.flags = 0
1784                     ORDER BY r.from_id, r.to_id"
1785                ) {
1786                    if let Ok(rows) = stmt.query_map(params![fid, tpid], |row| {
1787                        Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1788                    }) {
1789                        for row in rows.flatten() { results.push(row); }
1790                    }
1791                }
1792            }
1793            (None, Some(tid), Some(tpid)) => {
1794                if let Ok(mut stmt) = conn.prepare_cached(
1795                    "SELECT e1.name, e2.name, t.name
1796                     FROM relation r
1797                     JOIN entity e1 ON e1.id = r.from_id
1798                     JOIN entity e2 ON e2.id = r.to_id
1799                     JOIN type_dict t ON t.id = r.type_id
1800                     WHERE r.to_id = ?1 AND r.type_id = ?2
1801                       AND e1.flags = 0 AND e2.flags = 0
1802                     ORDER BY r.from_id, r.to_id"
1803                ) {
1804                    if let Ok(rows) = stmt.query_map(params![tid, tpid], |row| {
1805                        Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1806                    }) {
1807                        for row in rows.flatten() { results.push(row); }
1808                    }
1809                }
1810            }
1811            (Some(fid), None, None) => {
1812                if let Ok(mut stmt) = conn.prepare_cached(
1813                    "SELECT e1.name, e2.name, t.name
1814                     FROM relation r
1815                     JOIN entity e1 ON e1.id = r.from_id
1816                     JOIN entity e2 ON e2.id = r.to_id
1817                     JOIN type_dict t ON t.id = r.type_id
1818                     WHERE r.from_id = ?1
1819                       AND e1.flags = 0 AND e2.flags = 0
1820                     ORDER BY r.from_id, r.to_id"
1821                ) {
1822                    if let Ok(rows) = stmt.query_map(params![fid], |row| {
1823                        Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1824                    }) {
1825                        for row in rows.flatten() { results.push(row); }
1826                    }
1827                }
1828            }
1829            (None, Some(tid), None) => {
1830                if let Ok(mut stmt) = conn.prepare_cached(
1831                    "SELECT e1.name, e2.name, t.name
1832                     FROM relation r
1833                     JOIN entity e1 ON e1.id = r.from_id
1834                     JOIN entity e2 ON e2.id = r.to_id
1835                     JOIN type_dict t ON t.id = r.type_id
1836                     WHERE r.to_id = ?1
1837                       AND e1.flags = 0 AND e2.flags = 0
1838                     ORDER BY r.from_id, r.to_id"
1839                ) {
1840                    if let Ok(rows) = stmt.query_map(params![tid], |row| {
1841                        Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1842                    }) {
1843                        for row in rows.flatten() { results.push(row); }
1844                    }
1845                }
1846            }
1847            (None, None, Some(tpid)) => {
1848                if let Ok(mut stmt) = conn.prepare_cached(
1849                    "SELECT e1.name, e2.name, t.name
1850                     FROM relation r
1851                     JOIN entity e1 ON e1.id = r.from_id
1852                     JOIN entity e2 ON e2.id = r.to_id
1853                     JOIN type_dict t ON t.id = r.type_id
1854                     WHERE r.type_id = ?1
1855                       AND e1.flags = 0 AND e2.flags = 0
1856                     ORDER BY r.from_id, r.to_id"
1857                ) {
1858                    if let Ok(rows) = stmt.query_map(params![tpid], |row| {
1859                        Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1860                    }) {
1861                        for row in rows.flatten() { results.push(row); }
1862                    }
1863                }
1864            }
1865            (None, None, None) => {
1866                if let Ok(mut stmt) = conn.prepare_cached(
1867                    "SELECT e1.name, e2.name, t.name
1868                     FROM relation r
1869                     JOIN entity e1 ON e1.id = r.from_id
1870                     JOIN entity e2 ON e2.id = r.to_id
1871                     JOIN type_dict t ON t.id = r.type_id
1872                     WHERE e1.flags = 0 AND e2.flags = 0
1873                     ORDER BY r.from_id, r.to_id"
1874                ) {
1875                    if let Ok(rows) = stmt.query_map([], |row| {
1876                        Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1877                    }) {
1878                        for row in rows.flatten() { results.push(row); }
1879                    }
1880                }
1881            }
1882        }
1883        results
1884    }
1885
1886    pub fn find_path(&self, from: &str, to: &str) -> Result<Option<Vec<String>>> {
1887        let conn = self.readers.get();
1888        let (from_id, _, _, _) = match self.get_entity_id(&conn, from)? {
1889            Some(v) => v,
1890            None => {
1891                return Err(MCSError::InvalidParams(format!(
1892                    "Source entity '{from}' not found"
1893                )))
1894            }
1895        };
1896        let (to_id, _, _, _) = match self.get_entity_id(&conn, to)? {
1897            Some(v) => v,
1898            None => {
1899                return Err(MCSError::InvalidParams(format!(
1900                    "Target entity '{to}' not found"
1901                )))
1902            }
1903        };
1904
1905        if from_id == to_id {
1906            return Ok(Some(vec![from.to_string()]));
1907        }
1908
1909        // BFS with adjacency from relation table.
1910        let mut visited = HashSet::new();
1911        let mut parent: FxHashMap<i64, i64> = FxHashMap::default();
1912        let mut queue = VecDeque::new();
1913        visited.insert(from_id);
1914        queue.push_back(from_id);
1915
1916        while let Some(cur) = queue.pop_front() {
1917            if cur == to_id {
1918                break;
1919            }
1920            // Fetch out-neighbors.
1921            if let Ok(mut stmt) =
1922                conn.prepare_cached("SELECT to_id FROM relation WHERE from_id = ?1")
1923            {
1924                if let Ok(rows) = stmt.query_map(params![cur], |row| row.get::<_, i64>(0)) {
1925                    for row in rows.flatten() {
1926                        if visited.insert(row) {
1927                            parent.insert(row, cur);
1928                            queue.push_back(row);
1929                        }
1930                    }
1931                }
1932            }
1933            // Also check in-neighbors (undirected traversal).
1934            if let Ok(mut stmt) =
1935                conn.prepare_cached("SELECT from_id FROM relation WHERE to_id = ?1")
1936            {
1937                if let Ok(rows) = stmt.query_map(params![cur], |row| row.get::<_, i64>(0)) {
1938                    for row in rows.flatten() {
1939                        if visited.insert(row) {
1940                            parent.insert(row, cur);
1941                            queue.push_back(row);
1942                        }
1943                    }
1944                }
1945            }
1946        }
1947
1948        if !parent.contains_key(&to_id) && to_id != from_id {
1949            return Ok(None);
1950        }
1951
1952        let mut path = Vec::new();
1953        let mut cur = to_id;
1954        path.push(cur);
1955        while let Some(&p) = parent.get(&cur) {
1956            path.push(p);
1957            cur = p;
1958            if cur == from_id {
1959                break;
1960            }
1961        }
1962        path.reverse();
1963
1964        let mut name_path = Vec::with_capacity(path.len());
1965        for id in path {
1966            if let Ok(name) = conn.query_row(
1967                "SELECT name FROM entity WHERE id = ?1",
1968                params![id],
1969                |row| row.get::<_, String>(0),
1970            ) {
1971                name_path.push(name);
1972            }
1973        }
1974
1975        Ok(Some(name_path))
1976    }
1977
1978    pub fn compact(&self) -> Result<()> {
1979        let conn = self.writer.lock();
1980        conn.execute_batch("PRAGMA incremental_vacuum;").map_err(sqlite_err)?;
1981        Ok(())
1982    }
1983
1984    pub fn neighbors(
1985        &self,
1986        name: &str,
1987        direction: Direction,
1988        rtype: Option<&str>,
1989        depth: u32,
1990    ) -> Result<String> {
1991        self._traverse(name, direction, rtype, depth, true)
1992    }
1993
1994    pub fn extract_subgraph(
1995        &self,
1996        names: &[String],
1997        depth: u32,
1998    ) -> Result<String> {
1999        if names.is_empty() {
2000            return Ok(r#"{"entities":[],"relations":[]}"#.to_string());
2001        }
2002
2003        let conn = self.readers.get();
2004        let mut all_entity_ids: HashSet<i64> = HashSet::new();
2005        let mut frontier: HashSet<i64> = HashSet::new();
2006        let mut all_rel_pairs: HashSet<(i64, i64, i64)> = HashSet::new();
2007
2008        // Resolve seed entities.
2009        for name in names {
2010            let h = name_hash(name);
2011            if let Ok(Some(id)) = conn
2012                .query_row(
2013                    "SELECT id FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
2014                    params![h, name],
2015                    |row| row.get::<_, i64>(0),
2016                )
2017                .map(Some)
2018                .or_else(|e| if is_not_found(&e) { Ok(None) } else { Err(sqlite_err(e)) })
2019            {
2020                all_entity_ids.insert(id);
2021                frontier.insert(id);
2022            }
2023        }
2024
2025        let mut current_depth = 0u32;
2026        while current_depth < depth && !frontier.is_empty() {
2027            let mut next_frontier: HashSet<i64> = HashSet::new();
2028
2029            // Collect relations for current frontier.
2030            for fid in &frontier {
2031                if let Ok(mut stmt) = conn.prepare_cached(
2032                    "SELECT from_id, to_id, type_id FROM relation WHERE from_id = ?1",
2033                ) {
2034                    if let Ok(rows) =
2035                        stmt.query_map(params![fid], |row| {
2036                            Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?, row.get::<_, i64>(2)?))
2037                        })
2038                    {
2039                        for row in rows.flatten() {
2040                            let (from_id, to_id, type_id) = row;
2041                            all_rel_pairs.insert((from_id, to_id, type_id));
2042                            if all_entity_ids.insert(to_id) {
2043                                next_frontier.insert(to_id);
2044                            }
2045                        }
2046                    }
2047                }
2048                if let Ok(mut stmt) = conn.prepare_cached(
2049                    "SELECT from_id, to_id, type_id FROM relation WHERE to_id = ?1",
2050                ) {
2051                    if let Ok(rows) =
2052                        stmt.query_map(params![fid], |row| {
2053                            Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?, row.get::<_, i64>(2)?))
2054                        })
2055                    {
2056                        for row in rows.flatten() {
2057                            let (from_id, to_id, type_id) = row;
2058                            all_rel_pairs.insert((from_id, to_id, type_id));
2059                            if all_entity_ids.insert(from_id) {
2060                                next_frontier.insert(from_id);
2061                            }
2062                        }
2063                    }
2064                }
2065            }
2066            frontier = next_frontier;
2067            current_depth += 1;
2068        }
2069
2070        let entities_json: String = {
2071            if all_entity_ids.is_empty() {
2072                "[]".to_string()
2073            } else {
2074                let ids: Vec<i64> = all_entity_ids.iter().copied().collect();
2075                let placeholders: Vec<String> = ids.iter().map(|_| "?".to_string()).collect();
2076                let sql = format!(
2077                    "SELECT COALESCE(json_group_array(json_object(
2078                        'name', e.name,
2079                        'entityType', t.name,
2080                        'observations', COALESCE((
2081                            SELECT json_group_array(o.body ORDER BY o.idx)
2082                            FROM observation o WHERE o.entity_id = e.id
2083                        ), json('[]'))
2084                    ) ORDER BY e.id), json('[]'))
2085                    FROM entity e
2086                    JOIN type_dict t ON t.id = e.type_id
2087                    WHERE e.id IN ({}) AND e.flags = 0",
2088                    placeholders.join(",")
2089                );
2090                conn.query_row(&sql, rusqlite::params_from_iter(&ids), |row| {
2091                    row.get::<_, String>(0)
2092                })
2093                .unwrap_or_else(|_| "[]".to_string())
2094            }
2095        };
2096
2097        let relations_json: String = {
2098            if all_rel_pairs.is_empty() {
2099                "[]".to_string()
2100            } else {
2101                let vals: Vec<String> = all_rel_pairs.iter().map(|_| "(?, ?, ?)".to_string()).collect();
2102                let sql = format!(
2103                    "WITH r(from_id, to_id, type_id) AS (VALUES {})
2104                    SELECT COALESCE(json_group_array(json_object(
2105                        'from', e1.name,
2106                        'to', e2.name,
2107                        'relationType', t.name
2108                    )), json('[]'))
2109                    FROM r
2110                    JOIN entity e1 ON e1.id = r.from_id
2111                    JOIN entity e2 ON e2.id = r.to_id
2112                    JOIN type_dict t ON t.id = r.type_id
2113                    WHERE e1.flags = 0 AND e2.flags = 0",
2114                    vals.join(", ")
2115                );
2116                let params: Vec<&dyn ToSql> = all_rel_pairs.iter()
2117                    .flat_map(|(f, t, tp)| {
2118                        vec![f as &dyn ToSql, t as &dyn ToSql, tp as &dyn ToSql]
2119                    })
2120                    .collect();
2121                let mut stmt = conn.prepare(&sql).map_err(sqlite_err)?;
2122                stmt.query_row(params.as_slice(), |row| row.get::<_, String>(0))
2123                    .unwrap_or_else(|_| "[]".to_string())
2124            }
2125        };
2126
2127        let mut out = String::with_capacity(32 + entities_json.len() + relations_json.len());
2128        out.push_str("{\"entities\":");
2129        out.push_str(&entities_json);
2130        out.push_str(",\"relations\":");
2131        out.push_str(&relations_json);
2132        out.push('}');
2133        Ok(out)
2134    }
2135
2136    pub fn describe_entity(&self, name: &str) -> Result<Entity> {
2137        self.get_entity(name)?
2138            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))
2139    }
2140
2141    pub fn entity_type_counts(&self) -> Vec<(String, usize)> {
2142        let conn = self.readers.get();
2143        select_all_types(&conn, 0).unwrap_or_default()
2144    }
2145
2146    pub fn relation_type_counts(&self) -> Vec<(String, usize)> {
2147        let conn = self.readers.get();
2148        select_all_types(&conn, 1).unwrap_or_default()
2149    }
2150
2151    pub fn batch_get_entities(&self, names: &[String]) -> Vec<Option<Entity>> {
2152        names
2153            .iter()
2154            .map(|n| self.get_entity(n).unwrap_or(None))
2155            .collect()
2156    }
2157
2158    pub fn find_all_paths(
2159        &self,
2160        from: &str,
2161        to: &str,
2162        max_depth: usize,
2163        max_paths: usize,
2164    ) -> Result<Vec<Vec<String>>> {
2165        let conn = self.readers.get();
2166        let (from_id, _, _, _) = match self.get_entity_id(&conn, from)? {
2167            Some(v) => v,
2168            None => {
2169                return Err(MCSError::InvalidParams(format!(
2170                    "Source entity '{from}' not found"
2171                )))
2172            }
2173        };
2174        let (to_id, _, _, _) = match self.get_entity_id(&conn, to)? {
2175            Some(v) => v,
2176            None => {
2177                return Err(MCSError::InvalidParams(format!(
2178                    "Target entity '{to}' not found"
2179                )))
2180            }
2181        };
2182
2183        if from_id == to_id {
2184            return Ok(vec![vec![from.to_string()]]);
2185        }
2186
2187        // BFS enumerating all paths up to max_depth.
2188        let mut all_paths: Vec<Vec<i64>> = Vec::new();
2189        let mut queue: VecDeque<(i64, Vec<i64>)> = VecDeque::new();
2190        queue.push_back((from_id, vec![from_id]));
2191
2192        while let Some((cur, path)) = queue.pop_front() {
2193            if all_paths.len() >= max_paths {
2194                break;
2195            }
2196            if path.len() > max_depth {
2197                continue;
2198            }
2199
2200            // Out-neighbors.
2201            if let Ok(mut stmt) =
2202                conn.prepare_cached("SELECT to_id FROM relation WHERE from_id = ?1")
2203            {
2204                if let Ok(rows) = stmt.query_map(params![cur], |row| row.get::<_, i64>(0)) {
2205                    for next_id in rows.flatten() {
2206                        if next_id == to_id {
2207                            let mut full_path = path.clone();
2208                            full_path.push(next_id);
2209                            all_paths.push(full_path);
2210                            if all_paths.len() >= max_paths {
2211                                break;
2212                            }
2213                        } else if !path.contains(&next_id) && path.len() < max_depth {
2214                            let mut new_path = path.clone();
2215                            new_path.push(next_id);
2216                            queue.push_back((next_id, new_path));
2217                        }
2218                    }
2219                }
2220            }
2221
2222            // In-neighbors (undirected).
2223            if let Ok(mut stmt) =
2224                conn.prepare_cached("SELECT from_id FROM relation WHERE to_id = ?1")
2225            {
2226                if let Ok(rows) = stmt.query_map(params![cur], |row| row.get::<_, i64>(0)) {
2227                    for next_id in rows.flatten() {
2228                        if next_id == to_id {
2229                            let mut full_path = path.clone();
2230                            full_path.push(next_id);
2231                            all_paths.push(full_path);
2232                            if all_paths.len() >= max_paths {
2233                                break;
2234                            }
2235                        } else if !path.contains(&next_id) && path.len() < max_depth {
2236                            let mut new_path = path.clone();
2237                            new_path.push(next_id);
2238                            queue.push_back((next_id, new_path));
2239                        }
2240                    }
2241                }
2242            }
2243        }
2244
2245        // Convert ids to names.
2246        let mut named_paths: Vec<Vec<String>> = Vec::new();
2247        for path_ids in all_paths {
2248            let mut named = Vec::with_capacity(path_ids.len());
2249            for id in path_ids {
2250                if let Ok(name) = conn.query_row(
2251                    "SELECT name FROM entity WHERE id = ?1",
2252                    params![id],
2253                    |row| row.get::<_, String>(0),
2254                ) {
2255                    named.push(name);
2256                }
2257            }
2258            named_paths.push(named);
2259        }
2260
2261        Ok(named_paths)
2262    }
2263
2264    /// Export the whole graph as a JSON string. `max_rows` caps both the entity
2265    /// and relation arrays so a pathologically large graph cannot be coerced
2266    /// into an unbounded in-memory string (DoS guard); callers pass a generous
2267    /// constant. A negative value means "no limit".
2268    pub fn export(&self, _format: &str, max_rows: i64) -> Result<String> {
2269        let conn = self.readers.get();
2270        // Only JSON is supported; the format argument is accepted for forward
2271        // compatibility.
2272        conn.query_row(
2273            "SELECT json_object(
2274                'entities', COALESCE((
2275                    SELECT json_group_array(json_object(
2276                        'name', e.name,
2277                        'entityType', t.name,
2278                        'observations', COALESCE((
2279                            SELECT json_group_array(o.body ORDER BY o.idx)
2280                            FROM observation o WHERE o.entity_id = e.id
2281                        ), json('[]'))
2282                    ) ORDER BY e.id)
2283                    FROM (
2284                        SELECT id, name, type_id FROM entity
2285                        WHERE flags = 0 ORDER BY id LIMIT ?1
2286                    ) e
2287                    JOIN type_dict t ON t.id = e.type_id
2288                ), json('[]')),
2289                'relations', COALESCE((
2290                    SELECT json_group_array(json_object(
2291                        'from', e1.name,
2292                        'to', e2.name,
2293                        'relationType', t.name
2294                    ))
2295                    FROM (
2296                        SELECT from_id, to_id, type_id FROM relation LIMIT ?1
2297                    ) r
2298                    JOIN entity e1 ON e1.id = r.from_id
2299                    JOIN entity e2 ON e2.id = r.to_id
2300                    JOIN type_dict t ON t.id = r.type_id
2301                    WHERE e1.flags = 0 AND e2.flags = 0
2302                ), json('[]'))
2303            )",
2304            params![max_rows],
2305            |row| row.get::<_, String>(0),
2306        )
2307        .map_err(sqlite_err)
2308    }
2309
2310    pub fn wipe(&self) -> Result<()> {
2311        let conn = self.writer.lock();
2312        // `name_fts`/`obs_fts` are external-content FTS5 tables; the supported
2313        // way to empty them is the special `'delete-all'` command, not a bare
2314        // `DELETE FROM` (which is invalid for external-content tables and can
2315        // leave the index inconsistent). Run it after clearing the content rows.
2316        conn.execute_batch(
2317            "DELETE FROM observation;
2318             DELETE FROM relation;
2319             DELETE FROM entity;
2320             DELETE FROM type_dict;
2321             INSERT INTO name_fts(name_fts) VALUES('delete-all');
2322             INSERT INTO obs_fts(obs_fts) VALUES('delete-all');
2323             UPDATE graph_stat SET value = 0 WHERE key IN ('entities', 'relations', 'observations');
2324             UPDATE graph_stat SET value = 0 WHERE key IN ('entity_seq', 'obs_seq');",
2325        )
2326        .map_err(sqlite_err)?;
2327        self.seq_entity.store(0, Ordering::Relaxed);
2328        self.seq_obs.store(0, Ordering::Relaxed);
2329        self.cache.lock().clear();
2330        Ok(())
2331    }
2332
2333    /// Periodic database maintenance: WAL checkpoint, query planner analysis,
2334    /// and FTS index optimization. Call from a background timer.
2335    pub fn run_maintenance(&self) -> Result<()> {
2336        let conn = self.writer.lock();
2337
2338        conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);")
2339            .map_err(sqlite_err)?;
2340
2341        conn.execute_batch("PRAGMA optimize(0x10000);")
2342            .map_err(sqlite_err)?;
2343
2344        conn.execute_batch(
2345            "INSERT INTO name_fts(name_fts) VALUES('optimize');
2346             INSERT INTO obs_fts(obs_fts) VALUES('optimize');",
2347        )
2348        .map_err(sqlite_err)?;
2349
2350        Ok(())
2351    }
2352
2353    fn _traverse(
2354        &self,
2355        name: &str,
2356        direction: Direction,
2357        rtype: Option<&str>,
2358        depth: u32,
2359        // unused — we always include relations; the caller controls via depth
2360        _include_relations: bool,
2361    ) -> Result<String> {
2362        let conn = self.readers.get();
2363        let (start_id, _, _, _) = match self.get_entity_id(&conn, name)? {
2364            Some(v) => v,
2365            None => {
2366                return Err(MCSError::InvalidParams(format!(
2367                    "Entity '{name}' not found"
2368                )))
2369            }
2370        };
2371
2372        let mut all_ids: HashSet<i64> = HashSet::new();
2373        let mut all_rels: HashSet<(i64, i64, i64)> = HashSet::new();
2374        let mut frontier: HashSet<i64> = HashSet::new();
2375        all_ids.insert(start_id);
2376        frontier.insert(start_id);
2377
2378        // Read-only type resolution. A requested-but-missing type uses the
2379        // sentinel id -1 (matches no edge), so traversal yields just the start
2380        // entity instead of falling back to "no type filter" and walking every
2381        // edge. `get_type_id` is avoided here: it inserts and cannot run on the
2382        // `query_only` reader connection.
2383        let type_filter: Option<i64> = rtype
2384            .filter(|rt| !rt.is_empty())
2385            .map(|rt| lookup_type_id(&conn, rt, 1).unwrap_or(-1));
2386
2387        // Pre-compile all four possible queries outside the loop.
2388        let mut q_out_t = conn.prepare_cached(
2389            "SELECT to_id, type_id FROM relation WHERE from_id = ?1 AND type_id = ?2");
2390        let mut q_out   = conn.prepare_cached(
2391            "SELECT to_id, type_id FROM relation WHERE from_id = ?1");
2392        let mut q_in_t  = conn.prepare_cached(
2393            "SELECT from_id, type_id FROM relation WHERE to_id = ?1 AND type_id = ?2");
2394        let mut q_in    = conn.prepare_cached(
2395            "SELECT from_id, type_id FROM relation WHERE to_id = ?1");
2396
2397        let mut cur_depth = 0u32;
2398        while cur_depth < depth && !frontier.is_empty() {
2399            let mut next_frontier: HashSet<i64> = HashSet::new();
2400
2401            for &fid in &frontier {
2402                if direction == Direction::Outgoing || direction == Direction::Both {
2403                    if let Some(tid) = type_filter {
2404                        if let Ok(ref mut stmt) = q_out_t {
2405                            if let Ok(rows) = stmt.query_map(params![fid, tid], |row| {
2406                                Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?))
2407                            }) {
2408                                for row in rows.flatten() {
2409                                    let (to_id, t_id) = row;
2410                                    all_rels.insert((fid, to_id, t_id));
2411                                    if all_ids.insert(to_id) { next_frontier.insert(to_id); }
2412                                }
2413                            }
2414                        }
2415                    } else if let Ok(ref mut stmt) = q_out {
2416                        if let Ok(rows) = stmt.query_map(params![fid], |row| {
2417                            Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?))
2418                        }) {
2419                            for row in rows.flatten() {
2420                                let (to_id, t_id) = row;
2421                                all_rels.insert((fid, to_id, t_id));
2422                                if all_ids.insert(to_id) { next_frontier.insert(to_id); }
2423                            }
2424                        }
2425                    }
2426                }
2427
2428                if direction == Direction::Incoming || direction == Direction::Both {
2429                    if let Some(tid) = type_filter {
2430                        if let Ok(ref mut stmt) = q_in_t {
2431                            if let Ok(rows) = stmt.query_map(params![fid, tid], |row| {
2432                                Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?))
2433                            }) {
2434                                for row in rows.flatten() {
2435                                    let (from_id, t_id) = row;
2436                                    all_rels.insert((from_id, fid, t_id));
2437                                    if all_ids.insert(from_id) { next_frontier.insert(from_id); }
2438                                }
2439                            }
2440                        }
2441                    } else if let Ok(ref mut stmt) = q_in {
2442                        if let Ok(rows) = stmt.query_map(params![fid], |row| {
2443                            Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?))
2444                        }) {
2445                            for row in rows.flatten() {
2446                                let (from_id, t_id) = row;
2447                                all_rels.insert((from_id, fid, t_id));
2448                                if all_ids.insert(from_id) { next_frontier.insert(from_id); }
2449                            }
2450                        }
2451                    }
2452                }
2453            }
2454
2455            frontier = next_frontier;
2456            cur_depth += 1;
2457        }
2458
2459        let entities_json: String = {
2460            if all_ids.is_empty() {
2461                "[]".to_string()
2462            } else {
2463                let ids: Vec<i64> = all_ids.iter().copied().collect();
2464                let placeholders: Vec<String> = ids.iter().map(|_| "?".to_string()).collect();
2465                let sql = format!(
2466                    "SELECT COALESCE(json_group_array(json_object(
2467                        'name', e.name,
2468                        'entityType', t.name,
2469                        'observations', COALESCE((
2470                            SELECT json_group_array(o.body ORDER BY o.idx)
2471                            FROM observation o WHERE o.entity_id = e.id
2472                        ), json('[]'))
2473                    ) ORDER BY e.id), json('[]'))
2474                    FROM entity e
2475                    JOIN type_dict t ON t.id = e.type_id
2476                    WHERE e.id IN ({}) AND e.flags = 0",
2477                    placeholders.join(",")
2478                );
2479                conn.query_row(&sql, rusqlite::params_from_iter(&ids), |row| {
2480                    row.get::<_, String>(0)
2481                })
2482                .unwrap_or_else(|_| "[]".to_string())
2483            }
2484        };
2485
2486        let relations_json: String = {
2487            if all_rels.is_empty() {
2488                "[]".to_string()
2489            } else {
2490                let vals: Vec<String> = all_rels.iter().map(|_| "(?, ?, ?)".to_string()).collect();
2491                let sql = format!(
2492                    "WITH r(from_id, to_id, type_id) AS (VALUES {})
2493                    SELECT COALESCE(json_group_array(json_object(
2494                        'from', e1.name,
2495                        'to', e2.name,
2496                        'relationType', t.name
2497                    )), json('[]'))
2498                    FROM r
2499                    JOIN entity e1 ON e1.id = r.from_id
2500                    JOIN entity e2 ON e2.id = r.to_id
2501                    JOIN type_dict t ON t.id = r.type_id
2502                    WHERE e1.flags = 0 AND e2.flags = 0",
2503                    vals.join(", ")
2504                );
2505                let params: Vec<&dyn ToSql> = all_rels.iter()
2506                    .flat_map(|(f, t, tp)| {
2507                        vec![f as &dyn ToSql, t as &dyn ToSql, tp as &dyn ToSql]
2508                    })
2509                    .collect();
2510                let mut stmt = conn.prepare(&sql).map_err(sqlite_err)?;
2511                stmt.query_row(params.as_slice(), |row| row.get::<_, String>(0))
2512                    .unwrap_or_else(|_| "[]".to_string())
2513            }
2514        };
2515
2516        let mut out = String::with_capacity(32 + entities_json.len() + relations_json.len());
2517        out.push_str("{\"entities\":");
2518        out.push_str(&entities_json);
2519        out.push_str(",\"relations\":");
2520        out.push_str(&relations_json);
2521        out.push('}');
2522        Ok(out)
2523    }
2524}
2525
2526// ── Tests ────────────────────────────────────────────────────────────────
2527
2528#[cfg(test)]
2529mod tests {
2530    use super::*;
2531    use serde_json::Value;
2532    use std::ops::Deref;
2533    use std::path::PathBuf;
2534
2535    struct TestKg(GraphHandle, PathBuf);
2536
2537    impl Deref for TestKg {
2538        type Target = GraphHandle;
2539        fn deref(&self) -> &GraphHandle {
2540            &self.0
2541        }
2542    }
2543
2544    impl Drop for TestKg {
2545        fn drop(&mut self) {
2546            cleanup_db(&self.1);
2547        }
2548    }
2549
2550    fn cleanup_db(path: &std::path::Path) {
2551        let _ = std::fs::remove_file(path);
2552        let _ = std::fs::remove_file(path.with_extension("db-wal"));
2553        let _ = std::fs::remove_file(path.with_extension("db-shm"));
2554    }
2555
2556    fn new_kg() -> TestKg {
2557        use std::sync::atomic::AtomicU64;
2558        use std::sync::atomic::Ordering;
2559        static COUNTER: AtomicU64 = AtomicU64::new(0);
2560        let n = COUNTER.fetch_add(1, Ordering::SeqCst);
2561        let dir = std::env::temp_dir();
2562        let path = dir.join(format!("kg_test_{}_{}.db", std::process::id(), n));
2563        cleanup_db(&path);
2564        let kg = GraphHandle::new(&path, Durability::Async, 268435456, NonZeroUsize::new(10000).unwrap(), 4).expect("create KG");
2565        TestKg(kg, path)
2566    }
2567
2568    #[test]
2569    fn test_create_and_get_entity() {
2570        let kg = new_kg();
2571        let entities = vec![Entity {
2572            name: "test".into(),
2573            entity_type: "person".into(),
2574            observations: vec!["obs1".into(), "obs2".into()],
2575        }];
2576        let created = kg.create_entities(&entities).unwrap();
2577        assert_eq!(created.len(), 1);
2578
2579        let got = kg.get_entity("test").unwrap().unwrap();
2580        assert_eq!(got.name, "test");
2581        assert_eq!(got.entity_type, "person");
2582        assert_eq!(got.observations, vec!["obs1", "obs2"]);
2583    }
2584
2585    #[test]
2586    fn test_get_nonexistent() {
2587        let kg = new_kg();
2588        assert!(kg.get_entity("nonexistent").unwrap().is_none());
2589    }
2590
2591    #[test]
2592    fn test_delete_entity() {
2593        let kg = new_kg();
2594        kg.create_entities(&[Entity {
2595            name: "del".into(),
2596            entity_type: "t".into(),
2597            observations: vec![],
2598        }])
2599        .unwrap();
2600        assert!(kg.get_entity("del").unwrap().is_some());
2601        kg.delete_entities(&["del".to_string()]).unwrap();
2602        assert!(kg.get_entity("del").unwrap().is_none());
2603    }
2604
2605    #[test]
2606    fn test_add_and_delete_observations() {
2607        let kg = new_kg();
2608        kg.create_entities(&[Entity {
2609            name: "obs_test".into(),
2610            entity_type: "t".into(),
2611            observations: vec!["a".into()],
2612        }])
2613        .unwrap();
2614
2615        let added = kg.add_observations("obs_test", &["b".into(), "c".into()]).unwrap();
2616        assert_eq!(added.len(), 2);
2617
2618        let ent = kg.get_entity("obs_test").unwrap().unwrap();
2619        assert!(ent.observations.contains(&"b".into()));
2620        assert!(ent.observations.contains(&"c".into()));
2621
2622        kg.delete_observations("obs_test", &["b".into()]).unwrap();
2623        let ent = kg.get_entity("obs_test").unwrap().unwrap();
2624        assert!(!ent.observations.contains(&"b".into()));
2625        assert!(ent.observations.contains(&"c".into()));
2626        assert!(ent.observations.contains(&"a".into()));
2627    }
2628
2629    #[test]
2630    fn test_create_relations() {
2631        let kg = new_kg();
2632        kg.create_entities(&[
2633            Entity {
2634                name: "A".into(),
2635                entity_type: "node".into(),
2636                observations: vec![],
2637            },
2638            Entity {
2639                name: "B".into(),
2640                entity_type: "node".into(),
2641                observations: vec![],
2642            },
2643        ])
2644        .unwrap();
2645
2646        let rels = kg
2647            .create_relations(&[Relation {
2648                from: "A".into(),
2649                to: "B".into(),
2650                relation_type: "edge".into(),
2651            }])
2652            .unwrap();
2653        assert_eq!(rels.len(), 1);
2654
2655        assert_eq!(kg.get_entity_count().unwrap(), 2);
2656        assert_eq!(kg.get_relation_count().unwrap(), 1);
2657    }
2658
2659    #[test]
2660    fn test_search_nodes() {
2661        let kg = new_kg();
2662        kg.create_entities(&[Entity {
2663            name: "Einstein".into(),
2664            entity_type: "scientist".into(),
2665            observations: vec!["physics".into(), "relativity".into()],
2666        }])
2667        .unwrap();
2668
2669        let results = kg.search_nodes_filtered("physics", None, 0, 10);
2670        assert_eq!(results.len(), 1);
2671        assert_eq!(results[0].name, "Einstein");
2672
2673        let results = kg.search_nodes_filtered("physics", Some("scientist"), 0, 10);
2674        assert_eq!(results.len(), 1);
2675
2676        let results = kg.search_nodes_filtered("physics", Some("nonexistent"), 0, 10);
2677        assert_eq!(results.len(), 0);
2678    }
2679
2680    #[test]
2681    fn test_find_path() {
2682        let kg = new_kg();
2683        kg.create_entities(&[
2684            Entity { name: "A".into(), entity_type: "n".into(), observations: vec![] },
2685            Entity { name: "B".into(), entity_type: "n".into(), observations: vec![] },
2686            Entity { name: "C".into(), entity_type: "n".into(), observations: vec![] },
2687        ]).unwrap();
2688
2689        kg.create_relations(&[
2690            Relation { from: "A".into(), to: "B".into(), relation_type: "e".into() },
2691            Relation { from: "B".into(), to: "C".into(), relation_type: "e".into() },
2692        ]).unwrap();
2693
2694        let path = kg.find_path("A", "C").unwrap().unwrap();
2695        assert_eq!(path, vec!["A", "B", "C"]);
2696    }
2697
2698    #[test]
2699    fn test_degree() {
2700        let kg = new_kg();
2701        kg.create_entities(&[
2702            Entity { name: "A".into(), entity_type: "n".into(), observations: vec![] },
2703            Entity { name: "B".into(), entity_type: "n".into(), observations: vec![] },
2704            Entity { name: "C".into(), entity_type: "n".into(), observations: vec![] },
2705        ]).unwrap();
2706
2707        kg.create_relations(&[
2708            Relation { from: "A".into(), to: "B".into(), relation_type: "e".into() },
2709            Relation { from: "A".into(), to: "C".into(), relation_type: "e".into() },
2710        ]).unwrap();
2711
2712        assert_eq!(kg.degree("A", Direction::Outgoing).unwrap(), 2);
2713        assert_eq!(kg.degree("A", Direction::Incoming).unwrap(), 0);
2714        assert_eq!(kg.degree("B", Direction::Incoming).unwrap(), 1);
2715    }
2716
2717    #[test]
2718    fn test_neighbors() {
2719        let kg = new_kg();
2720        kg.create_entities(&[
2721            Entity { name: "A".into(), entity_type: "n".into(), observations: vec![] },
2722            Entity { name: "B".into(), entity_type: "n".into(), observations: vec![] },
2723        ]).unwrap();
2724
2725        kg.create_relations(&[Relation {
2726            from: "A".into(), to: "B".into(), relation_type: "e".into(),
2727        }]).unwrap();
2728
2729        let result = kg.neighbors("A", Direction::Outgoing, None, 1).unwrap();
2730        let v: Value = serde_json::from_str(&result).unwrap();
2731        assert_eq!(v["entities"].as_array().unwrap().len(), 2);
2732        assert_eq!(v["relations"].as_array().unwrap().len(), 1);
2733    }
2734
2735    #[test]
2736    fn test_open_nodes() {
2737        let kg = new_kg();
2738        kg.create_entities(&[
2739            Entity { name: "X".into(), entity_type: "n".into(), observations: vec!["obs_x".into()] },
2740            Entity { name: "Y".into(), entity_type: "n".into(), observations: vec!["obs_y".into()] },
2741        ]).unwrap();
2742
2743        kg.create_relations(&[Relation {
2744            from: "X".into(), to: "Y".into(), relation_type: "e".into(),
2745        }]).unwrap();
2746
2747        let result = kg.open_nodes(&["X".into()]);
2748        let v: Value = serde_json::from_str(&result).unwrap();
2749        assert_eq!(v["entities"].as_array().unwrap().len(), 1);
2750        assert_eq!(v["relations"].as_array().unwrap().len(), 1);
2751    }
2752
2753    #[test]
2754    fn test_entities_exist() {
2755        let kg = new_kg();
2756        kg.create_entities(&[Entity {
2757            name: "exists".into(), entity_type: "t".into(), observations: vec![],
2758        }]).unwrap();
2759
2760        let res = kg.entities_exist(&["exists".into(), "missing".into()]).unwrap();
2761        assert_eq!(res, vec![true, false]);
2762    }
2763
2764    #[test]
2765    fn test_describe_entity() {
2766        let kg = new_kg();
2767        kg.create_entities(&[Entity {
2768            name: "desc".into(), entity_type: "t".into(), observations: vec!["o".into()],
2769        }]).unwrap();
2770
2771        let entity = kg.describe_entity("desc").unwrap();
2772        assert_eq!(entity.name, "desc");
2773    }
2774
2775    #[test]
2776    fn test_entity_type_counts() {
2777        let kg = new_kg();
2778        kg.create_entities(&[
2779            Entity { name: "a".into(), entity_type: "person".into(), observations: vec![] },
2780            Entity { name: "b".into(), entity_type: "person".into(), observations: vec![] },
2781            Entity { name: "c".into(), entity_type: "place".into(), observations: vec![] },
2782        ]).unwrap();
2783
2784        let counts = kg.entity_type_counts();
2785        let map: FxHashMap<_, _> = counts.into_iter().collect();
2786        assert_eq!(map.get("person"), Some(&2));
2787        assert_eq!(map.get("place"), Some(&1));
2788    }
2789
2790    #[test]
2791    fn test_relation_type_counts() {
2792        let kg = new_kg();
2793        kg.create_entities(&[
2794            Entity { name: "a".into(), entity_type: "n".into(), observations: vec![] },
2795            Entity { name: "b".into(), entity_type: "n".into(), observations: vec![] },
2796            Entity { name: "c".into(), entity_type: "n".into(), observations: vec![] },
2797        ]).unwrap();
2798
2799        kg.create_relations(&[
2800            Relation { from: "a".into(), to: "b".into(), relation_type: "knows".into() },
2801            Relation { from: "a".into(), to: "c".into(), relation_type: "knows".into() },
2802        ]).unwrap();
2803
2804        let counts = kg.relation_type_counts();
2805        let map: FxHashMap<_, _> = counts.into_iter().collect();
2806        assert_eq!(map.get("knows"), Some(&2));
2807    }
2808
2809    #[test]
2810    fn test_upsert_entities() {
2811        let kg = new_kg();
2812        kg.create_entities(&[Entity {
2813            name: "u".into(), entity_type: "old".into(), observations: vec!["existing".into()],
2814        }]).unwrap();
2815
2816        // Upsert with new type and additional observation.
2817        kg.upsert_entities(&[Entity {
2818            name: "u".into(), entity_type: "new".into(), observations: vec!["existing".into(), "added".into()],
2819        }]).unwrap();
2820
2821        let ent = kg.get_entity("u").unwrap().unwrap();
2822        assert_eq!(ent.entity_type, "new");
2823        assert!(ent.observations.contains(&"added".into()));
2824        assert!(ent.observations.contains(&"existing".into()));
2825    }
2826
2827    #[test]
2828    fn test_merge_entities() {
2829        let kg = new_kg();
2830        kg.create_entities(&[
2831            Entity { name: "source".into(), entity_type: "t".into(), observations: vec!["src_obs".into()] },
2832            Entity { name: "target".into(), entity_type: "t".into(), observations: vec!["tgt_obs".into()] },
2833        ]).unwrap();
2834
2835        kg.create_relations(&[Relation {
2836            from: "source".into(), to: "target".into(), relation_type: "e".into(),
2837        }]).unwrap();
2838
2839        let merged = kg.merge_entities("source", "target").unwrap();
2840        assert_eq!(merged.name, "target");
2841        assert_eq!(kg.get_entity("source").unwrap().is_none(), true);
2842    }
2843
2844    #[test]
2845    fn test_find_all_paths() {
2846        let kg = new_kg();
2847        kg.create_entities(&[
2848            Entity { name: "A".into(), entity_type: "n".into(), observations: vec![] },
2849            Entity { name: "B".into(), entity_type: "n".into(), observations: vec![] },
2850            Entity { name: "C".into(), entity_type: "n".into(), observations: vec![] },
2851        ]).unwrap();
2852
2853        kg.create_relations(&[
2854            Relation { from: "A".into(), to: "B".into(), relation_type: "e".into() },
2855            Relation { from: "B".into(), to: "C".into(), relation_type: "e".into() },
2856            Relation { from: "A".into(), to: "C".into(), relation_type: "e".into() },
2857        ]).unwrap();
2858
2859        let paths = kg.find_all_paths("A", "C", 5, 10).unwrap();
2860        assert!(paths.len() >= 2);
2861    }
2862
2863    #[test]
2864    fn test_batch_get_entities() {
2865        let kg = new_kg();
2866        kg.create_entities(&[
2867            Entity { name: "a".into(), entity_type: "t".into(), observations: vec![] },
2868            Entity { name: "b".into(), entity_type: "t".into(), observations: vec![] },
2869        ]).unwrap();
2870
2871        let results = kg.batch_get_entities(&["a".into(), "missing".into(), "b".into()]);
2872        assert_eq!(results.len(), 3);
2873        assert!(results[0].is_some());
2874        assert!(results[1].is_none());
2875        assert!(results[2].is_some());
2876    }
2877
2878    #[test]
2879    fn test_export_graph() {
2880        let kg = new_kg();
2881        kg.create_entities(&[Entity {
2882            name: "exp".into(), entity_type: "t".into(), observations: vec!["o".into()],
2883        }]).unwrap();
2884
2885        let exported = kg.export("json", i64::MAX).unwrap();
2886        assert!(exported.contains("exp"));
2887        assert!(exported.contains("o"));
2888    }
2889
2890    #[test]
2891    fn test_graph_stats() {
2892        let kg = new_kg();
2893        assert_eq!(kg.get_entity_count().unwrap(), 0);
2894        assert_eq!(kg.get_relation_count().unwrap(), 0);
2895
2896        kg.create_entities(&[Entity {
2897            name: "s".into(), entity_type: "t".into(), observations: vec![],
2898        }]).unwrap();
2899
2900        assert_eq!(kg.get_entity_count().unwrap(), 1);
2901    }
2902
2903    #[test]
2904    fn test_read_graph_filtered() {
2905        let kg = new_kg();
2906        kg.create_entities(&[
2907            Entity { name: "p1".into(), entity_type: "person".into(), observations: vec![] },
2908            Entity { name: "p2".into(), entity_type: "place".into(), observations: vec![] },
2909        ]).unwrap();
2910
2911        let out = kg.read_graph_filtered(Some("person"), 0, 10).unwrap();
2912        let v: Value = serde_json::from_str(&out).unwrap();
2913        assert_eq!(v["entities"].as_array().unwrap().len(), 1);
2914        assert_eq!(v["entities"][0]["name"], "p1");
2915    }
2916
2917    #[test]
2918    fn test_wipe() {
2919        let kg = new_kg();
2920        kg.create_entities(&[Entity {
2921            name: "w".into(), entity_type: "t".into(), observations: vec!["o".into()],
2922        }]).unwrap();
2923        assert_eq!(kg.get_entity_count().unwrap(), 1);
2924
2925        kg.wipe().unwrap();
2926        assert_eq!(kg.get_entity_count().unwrap(), 0);
2927    }
2928
2929    #[test]
2930    fn test_push_json_str() {
2931        let mut buf = String::new();
2932        push_json_str(&mut buf, "hello");
2933        assert_eq!(buf, "\"hello\"");
2934        let mut buf = String::new();
2935        push_json_str(&mut buf, "he\"llo");
2936        assert_eq!(buf, "\"he\\\"llo\"");
2937    }
2938
2939    // ── create_entities edge cases ────────────────────────────────────
2940
2941    #[test]
2942    fn test_create_entities_empty_input() {
2943        let kg = new_kg();
2944        let created = kg.create_entities(&[]).unwrap();
2945        assert!(created.is_empty());
2946    }
2947
2948    #[test]
2949    fn test_create_entities_skip_empty_name() {
2950        let kg = new_kg();
2951        let created = kg.create_entities(&[Entity {
2952            name: "".into(),
2953            entity_type: "t".into(),
2954            observations: vec![],
2955        }])
2956        .unwrap();
2957        assert!(created.is_empty());
2958        assert_eq!(kg.get_entity_count().unwrap(), 0);
2959    }
2960
2961    #[test]
2962    fn test_create_entities_duplicate_names() {
2963        let kg = new_kg();
2964        let e = Entity {
2965            name: "dup".into(),
2966            entity_type: "t".into(),
2967            observations: vec!["obs".into()],
2968        };
2969        let first = kg.create_entities(&[e.clone()]).unwrap();
2970        assert_eq!(first.len(), 1);
2971        let second = kg.create_entities(&[e.clone()]).unwrap();
2972        assert!(second.is_empty());
2973        assert_eq!(kg.get_entity_count().unwrap(), 1);
2974    }
2975
2976    #[test]
2977    fn test_create_entities_partial_duplicates() {
2978        let kg = new_kg();
2979        let created = kg.create_entities(&[
2980            Entity { name: "a".into(), entity_type: "t".into(), observations: vec![] },
2981            Entity { name: "b".into(), entity_type: "t".into(), observations: vec![] },
2982        ]).unwrap();
2983        assert_eq!(created.len(), 2);
2984
2985        let second = kg.create_entities(&[
2986            Entity { name: "b".into(), entity_type: "t".into(), observations: vec![] },
2987            Entity { name: "c".into(), entity_type: "t".into(), observations: vec![] },
2988        ]).unwrap();
2989        assert_eq!(second.len(), 1); // only c created
2990        assert_eq!(second[0].name, "c");
2991        assert_eq!(kg.get_entity_count().unwrap(), 3);
2992    }
2993
2994    #[test]
2995    fn test_create_entities_mixed_empty_and_valid() {
2996        let kg = new_kg();
2997        let created = kg.create_entities(&[
2998            Entity { name: "".into(), entity_type: "t".into(), observations: vec![] },
2999            Entity { name: "valid".into(), entity_type: "t".into(), observations: vec![] },
3000            Entity { name: "".into(), entity_type: "t".into(), observations: vec![] },
3001        ]).unwrap();
3002        assert_eq!(created.len(), 1);
3003        assert_eq!(created[0].name, "valid");
3004        assert_eq!(kg.get_entity_count().unwrap(), 1);
3005    }
3006
3007    #[test]
3008    fn test_create_entities_same_name_in_batch() {
3009        let kg = new_kg();
3010        let created = kg.create_entities(&[
3011            Entity { name: "dup_in_batch".into(), entity_type: "t".into(), observations: vec![] },
3012            Entity { name: "dup_in_batch".into(), entity_type: "t".into(), observations: vec![] },
3013        ]).unwrap();
3014        assert_eq!(created.len(), 1);
3015        assert_eq!(kg.get_entity_count().unwrap(), 1);
3016    }
3017
3018    // ── create_relations edge cases ───────────────────────────────────
3019
3020    #[test]
3021    fn test_create_relations_empty_input() {
3022        let kg = new_kg();
3023        let rels = kg.create_relations(&[]).unwrap();
3024        assert!(rels.is_empty());
3025    }
3026
3027    #[test]
3028    fn test_create_relations_nonexistent_from() {
3029        let kg = new_kg();
3030        kg.create_entities(&[Entity {
3031            name: "B".into(), entity_type: "t".into(), observations: vec![],
3032        }]).unwrap();
3033
3034        let rels = kg.create_relations(&[Relation {
3035            from: "A".into(), to: "B".into(), relation_type: "e".into(),
3036        }]).unwrap();
3037        assert!(rels.is_empty());
3038        assert_eq!(kg.get_relation_count().unwrap(), 0);
3039    }
3040
3041    #[test]
3042    fn test_create_relations_nonexistent_to() {
3043        let kg = new_kg();
3044        kg.create_entities(&[Entity {
3045            name: "A".into(), entity_type: "t".into(), observations: vec![],
3046        }]).unwrap();
3047
3048        let rels = kg.create_relations(&[Relation {
3049            from: "A".into(), to: "B".into(), relation_type: "e".into(),
3050        }]).unwrap();
3051        assert!(rels.is_empty());
3052        assert_eq!(kg.get_relation_count().unwrap(), 0);
3053    }
3054
3055    #[test]
3056    fn test_create_relations_both_nonexistent() {
3057        let kg = new_kg();
3058        let rels = kg.create_relations(&[Relation {
3059            from: "A".into(), to: "B".into(), relation_type: "e".into(),
3060        }]).unwrap();
3061        assert!(rels.is_empty());
3062    }
3063
3064    #[test]
3065    fn test_create_relations_self_loop() {
3066        let kg = new_kg();
3067        kg.create_entities(&[Entity {
3068            name: "self".into(), entity_type: "t".into(), observations: vec![],
3069        }]).unwrap();
3070
3071        let rels = kg.create_relations(&[Relation {
3072            from: "self".into(), to: "self".into(), relation_type: "loop".into(),
3073        }]).unwrap();
3074        assert_eq!(rels.len(), 1);
3075        assert_eq!(kg.get_relation_count().unwrap(), 1);
3076        assert_eq!(kg.degree("self", Direction::Outgoing).unwrap(), 1);
3077        assert_eq!(kg.degree("self", Direction::Incoming).unwrap(), 1);
3078    }
3079
3080    #[test]
3081    fn test_create_relations_duplicate() {
3082        let kg = new_kg();
3083        kg.create_entities(&[
3084            Entity { name: "A".into(), entity_type: "t".into(), observations: vec![] },
3085            Entity { name: "B".into(), entity_type: "t".into(), observations: vec![] },
3086        ]).unwrap();
3087
3088        let r = Relation {
3089            from: "A".into(), to: "B".into(), relation_type: "e".into(),
3090        };
3091        let first = kg.create_relations(&[r.clone()]).unwrap();
3092        assert_eq!(first.len(), 1);
3093
3094        let second = kg.create_relations(&[r.clone()]).unwrap();
3095        assert!(second.is_empty());
3096        assert_eq!(kg.get_relation_count().unwrap(), 1);
3097    }
3098
3099    #[test]
3100    fn test_create_relations_new_type_auto_created() {
3101        let kg = new_kg();
3102        kg.create_entities(&[
3103            Entity { name: "A".into(), entity_type: "t".into(), observations: vec![] },
3104            Entity { name: "B".into(), entity_type: "t".into(), observations: vec![] },
3105        ]).unwrap();
3106
3107        let rels = kg.create_relations(&[Relation {
3108            from: "A".into(), to: "B".into(), relation_type: "brand_new_type".into(),
3109        }]).unwrap();
3110        assert_eq!(rels.len(), 1);
3111
3112        let counts = kg.relation_type_counts();
3113        let map: FxHashMap<_, _> = counts.into_iter().collect();
3114        assert_eq!(map.get("brand_new_type"), Some(&1));
3115    }
3116
3117    #[test]
3118    fn test_create_relations_degree_updates() {
3119        let kg = new_kg();
3120        kg.create_entities(&[
3121            Entity { name: "A".into(), entity_type: "t".into(), observations: vec![] },
3122            Entity { name: "B".into(), entity_type: "t".into(), observations: vec![] },
3123            Entity { name: "C".into(), entity_type: "t".into(), observations: vec![] },
3124        ]).unwrap();
3125
3126        kg.create_relations(&[
3127            Relation { from: "A".into(), to: "B".into(), relation_type: "e".into() },
3128            Relation { from: "A".into(), to: "C".into(), relation_type: "e".into() },
3129        ]).unwrap();
3130
3131        assert_eq!(kg.degree("A", Direction::Outgoing).unwrap(), 2);
3132        assert_eq!(kg.degree("A", Direction::Incoming).unwrap(), 0);
3133        assert_eq!(kg.degree("B", Direction::Incoming).unwrap(), 1);
3134        assert_eq!(kg.degree("C", Direction::Incoming).unwrap(), 1);
3135        assert_eq!(kg.degree("A", Direction::Both).unwrap(), 2);
3136    }
3137
3138    #[test]
3139    fn test_create_relations_delete_and_recreate() {
3140        let kg = new_kg();
3141        kg.create_entities(&[
3142            Entity { name: "A".into(), entity_type: "t".into(), observations: vec![] },
3143            Entity { name: "B".into(), entity_type: "t".into(), observations: vec![] },
3144        ]).unwrap();
3145
3146        let r = Relation {
3147            from: "A".into(), to: "B".into(), relation_type: "e".into(),
3148        };
3149        kg.create_relations(&[r.clone()]).unwrap();
3150        assert_eq!(kg.get_relation_count().unwrap(), 1);
3151
3152        kg.delete_relations(&[r.clone()]).unwrap();
3153        assert_eq!(kg.get_relation_count().unwrap(), 0);
3154
3155        // Recreate after delete
3156        let re = kg.create_relations(&[r.clone()]).unwrap();
3157        assert_eq!(re.len(), 1);
3158        assert_eq!(kg.get_relation_count().unwrap(), 1);
3159    }
3160
3161    // ── Integration edge cases ────────────────────────────────────────
3162
3163    #[test]
3164    fn test_create_entities_then_relations_then_delete_entity_with_relations() {
3165        let kg = new_kg();
3166        kg.create_entities(&[
3167            Entity { name: "A".into(), entity_type: "t".into(), observations: vec![] },
3168            Entity { name: "B".into(), entity_type: "t".into(), observations: vec![] },
3169        ]).unwrap();
3170        kg.create_relations(&[
3171            Relation { from: "A".into(), to: "B".into(), relation_type: "e".into() },
3172        ]).unwrap();
3173
3174        assert_eq!(kg.get_relation_count().unwrap(), 1);
3175
3176        // Deleting entity A should also delete the relation
3177        kg.delete_entities(&["A".into()]).unwrap();
3178        assert_eq!(kg.get_entity("A").unwrap().is_none(), true);
3179        assert_eq!(kg.get_relation_count().unwrap(), 0);
3180    }
3181
3182    #[test]
3183    fn test_graph_stats_after_entity_with_observations() {
3184        let kg = new_kg();
3185        kg.create_entities(&[Entity {
3186            name: "stat".into(), entity_type: "t".into(),
3187            observations: vec!["o1".into(), "o2".into(), "o3".into()],
3188        }]).unwrap();
3189
3190        let ecount = kg.get_entity_count().unwrap();
3191        // graph_stat for observations is tracked but there's no public getter for it
3192        assert_eq!(ecount, 1);
3193
3194        // delete reverts stats
3195        kg.delete_entities(&["stat".into()]).unwrap();
3196        assert_eq!(kg.get_entity_count().unwrap(), 0);
3197    }
3198
3199    // ── Helpers for the fix-specific suites ────────────────────────────────
3200
3201    fn new_kg_with_pool(read_pool_size: usize) -> TestKg {
3202        use std::sync::atomic::AtomicU64;
3203        static COUNTER: AtomicU64 = AtomicU64::new(1_000_000);
3204        let n = COUNTER.fetch_add(1, Ordering::SeqCst);
3205        let path = std::env::temp_dir().join(format!("kg_pool_{}_{}.db", std::process::id(), n));
3206        cleanup_db(&path);
3207        let kg = GraphHandle::new(
3208            &path,
3209            Durability::Async,
3210            268_435_456,
3211            NonZeroUsize::new(10_000).unwrap(),
3212            read_pool_size,
3213        )
3214        .expect("create KG");
3215        TestKg(kg, path)
3216    }
3217
3218    fn seed_line(kg: &GraphHandle, n: usize) {
3219        let entities: Vec<Entity> = (0..n)
3220            .map(|i| Entity {
3221                name: format!("n{i}"),
3222                entity_type: "node".into(),
3223                observations: vec![format!("obs of n{i}")],
3224            })
3225            .collect();
3226        kg.create_entities(&entities).unwrap();
3227        let rels: Vec<Relation> = (0..n.saturating_sub(1))
3228            .map(|i| Relation {
3229                from: format!("n{i}"),
3230                to: format!("n{}", i + 1),
3231                relation_type: "edge".into(),
3232            })
3233            .collect();
3234        if !rels.is_empty() {
3235            kg.create_relations(&rels).unwrap();
3236        }
3237    }
3238
3239    fn count_relations(graph_json: &str) -> usize {
3240        let v: Value = serde_json::from_str(graph_json).unwrap();
3241        v["relations"].as_array().unwrap().len()
3242    }
3243
3244    fn count_entities(graph_json: &str) -> usize {
3245        let v: Value = serde_json::from_str(graph_json).unwrap();
3246        v["entities"].as_array().unwrap().len()
3247    }
3248
3249    // ── Fix #1: reader pool / concurrency ──────────────────────────────────
3250
3251    #[test]
3252    fn test_pool_size_one_still_works() {
3253        let kg = new_kg_with_pool(1);
3254        seed_line(&kg, 5);
3255        assert_eq!(kg.get_entity_count().unwrap(), 5);
3256        assert!(kg.get_entity("n2").unwrap().is_some());
3257        let g = kg.read_graph_filtered(None, 0, usize::MAX).unwrap();
3258        assert_eq!(count_entities(&g), 5);
3259    }
3260
3261    #[test]
3262    fn test_reads_see_committed_writes() {
3263        // A read on a pool connection must observe a just-committed write made on
3264        // the writer connection (WAL visibility).
3265        let kg = new_kg_with_pool(4);
3266        kg.create_entities(&[Entity {
3267            name: "fresh".into(),
3268            entity_type: "t".into(),
3269            observations: vec!["v".into()],
3270        }])
3271        .unwrap();
3272        // get_entity goes through the reader pool.
3273        let got = kg.get_entity("fresh").unwrap().unwrap();
3274        assert_eq!(got.observations, vec!["v"]);
3275    }
3276
3277    #[test]
3278    fn test_concurrent_readers_consistent() {
3279        // Many readers hammering the pool while the writer mutates must never
3280        // panic, deadlock, or observe a torn graph. The final counts must match.
3281        let kg = new_kg_with_pool(4);
3282        seed_line(&kg, 50);
3283
3284        std::thread::scope(|s| {
3285            // 8 reader threads.
3286            for _ in 0..8 {
3287                s.spawn(|| {
3288                    for _ in 0..200 {
3289                        let _ = kg.get_entity("n10");
3290                        let _ = kg.search_nodes_filtered("obs", None, 0, 10);
3291                        let _ = kg.read_graph_filtered(None, 0, 100);
3292                        let _ = kg.get_entity_count();
3293                        let _ = kg.neighbors("n10", Direction::Both, None, 2);
3294                    }
3295                });
3296            }
3297            // 1 writer thread adding more entities concurrently.
3298            s.spawn(|| {
3299                for i in 100..160 {
3300                    kg.create_entities(&[Entity {
3301                        name: format!("w{i}"),
3302                        entity_type: "node".into(),
3303                        observations: vec![format!("w obs {i}")],
3304                    }])
3305                    .unwrap();
3306                }
3307            });
3308        });
3309
3310        // 50 seeded + 60 written.
3311        assert_eq!(kg.get_entity_count().unwrap(), 110);
3312        assert!(kg.get_entity("w159").unwrap().is_some());
3313    }
3314
3315    #[test]
3316    fn test_reader_pool_rejects_writes_internally() {
3317        // Sanity: query_only readers cannot mutate. We can't call a write through
3318        // the pool directly, but we can confirm a read method that *would* have
3319        // inserted (search_relations resolving a missing type) does not create a
3320        // phantom type — see the dedicated test below — and that reads under a
3321        // size-1 pool serialize correctly without deadlock.
3322        let kg = new_kg_with_pool(1);
3323        seed_line(&kg, 3);
3324        std::thread::scope(|s| {
3325            for _ in 0..4 {
3326                s.spawn(|| {
3327                    for _ in 0..100 {
3328                        let _ = kg.read_graph_filtered(None, 0, 10);
3329                    }
3330                });
3331            }
3332        });
3333        assert_eq!(kg.get_entity_count().unwrap(), 3);
3334    }
3335
3336    // ── Fix #6: read_graph relation scoping + export bound ─────────────────
3337
3338    #[test]
3339    fn test_read_graph_relations_scoped_to_page() {
3340        let kg = new_kg_with_pool(2);
3341        // n0 -> n1 -> n2 -> n3 (4 entities, 3 edges).
3342        seed_line(&kg, 4);
3343
3344        // Full page: all 3 edges present.
3345        let full = kg.read_graph_filtered(None, 0, usize::MAX).unwrap();
3346        assert_eq!(count_entities(&full), 4);
3347        assert_eq!(count_relations(&full), 3);
3348
3349        // Page of only the first entity (n0): its only edge n0->n1 has an
3350        // endpoint (n1) outside the page, so no relations are returned.
3351        let page1 = kg.read_graph_filtered(None, 0, 1).unwrap();
3352        assert_eq!(count_entities(&page1), 1);
3353        assert_eq!(count_relations(&page1), 0);
3354
3355        // Page of first two entities (n0, n1): edge n0->n1 fully inside, n1->n2
3356        // straddles the boundary and is excluded.
3357        let page2 = kg.read_graph_filtered(None, 0, 2).unwrap();
3358        assert_eq!(count_entities(&page2), 2);
3359        assert_eq!(count_relations(&page2), 1);
3360    }
3361
3362    #[test]
3363    fn test_read_graph_pagination_offset() {
3364        let kg = new_kg_with_pool(2);
3365        seed_line(&kg, 5);
3366        let g = kg.read_graph_filtered(None, 2, 2).unwrap();
3367        assert_eq!(count_entities(&g), 2);
3368        // Entities are ordered by id; offset 2 skips n0, n1.
3369        assert!(!g.contains("\"n0\""));
3370        assert!(!g.contains("\"n1\""));
3371        assert!(g.contains("\"n2\""));
3372        assert!(g.contains("\"n3\""));
3373    }
3374
3375    #[test]
3376    fn test_read_graph_empty() {
3377        let kg = new_kg_with_pool(2);
3378        let g = kg.read_graph_filtered(None, 0, usize::MAX).unwrap();
3379        assert_eq!(g, r#"{"entities":[],"relations":[]}"#);
3380    }
3381
3382    #[test]
3383    fn test_read_graph_filtered_by_type() {
3384        let kg = new_kg_with_pool(2);
3385        kg.create_entities(&[
3386            Entity { name: "p1".into(), entity_type: "person".into(), observations: vec![] },
3387            Entity { name: "q1".into(), entity_type: "place".into(), observations: vec![] },
3388            Entity { name: "p2".into(), entity_type: "person".into(), observations: vec![] },
3389        ])
3390        .unwrap();
3391        let g = kg.read_graph_filtered(Some("person"), 0, usize::MAX).unwrap();
3392        assert_eq!(count_entities(&g), 2);
3393        assert!(g.contains("\"p1\""));
3394        assert!(g.contains("\"p2\""));
3395        assert!(!g.contains("\"q1\""));
3396    }
3397
3398    #[test]
3399    fn test_export_respects_max_rows() {
3400        let kg = new_kg_with_pool(2);
3401        seed_line(&kg, 5);
3402
3403        // Unbounded export returns everything.
3404        let full = kg.export("json", i64::MAX).unwrap();
3405        assert_eq!(count_entities(&full), 5);
3406        assert_eq!(count_relations(&full), 4);
3407
3408        // Capped export truncates both arrays to the cap.
3409        let capped = kg.export("json", 2).unwrap();
3410        assert_eq!(count_entities(&capped), 2);
3411        assert_eq!(count_relations(&capped), 2);
3412    }
3413
3414    #[test]
3415    fn test_export_negative_max_rows_is_unbounded() {
3416        let kg = new_kg_with_pool(2);
3417        seed_line(&kg, 3);
3418        // SQLite treats a negative LIMIT as "no limit".
3419        let out = kg.export("json", -1).unwrap();
3420        assert_eq!(count_entities(&out), 3);
3421    }
3422
3423    // ── Fix #8: writes remain correct without the per-write PRAGMA optimize ─
3424
3425    #[test]
3426    fn test_many_small_write_batches_stay_consistent() {
3427        let kg = new_kg_with_pool(2);
3428        for i in 0..100 {
3429            kg.create_entities(&[Entity {
3430                name: format!("e{i}"),
3431                entity_type: "t".into(),
3432                observations: vec![format!("o{i}")],
3433            }])
3434            .unwrap();
3435        }
3436        assert_eq!(kg.get_entity_count().unwrap(), 100);
3437        // Search must still find a needle inserted across many tiny batches,
3438        // proving FTS stayed consistent without per-write optimization.
3439        let hits = kg.search_nodes_filtered("e57", None, 0, 10);
3440        assert!(hits.iter().any(|e| e.name == "e57"));
3441    }
3442
3443    // ── Fix #9: wipe fully resets the FTS indexes ──────────────────────────
3444
3445    #[test]
3446    fn test_wipe_clears_name_and_obs_fts() {
3447        let kg = new_kg_with_pool(2);
3448        kg.create_entities(&[Entity {
3449            name: "Einstein".into(),
3450            entity_type: "scientist".into(),
3451            observations: vec!["physics".into()],
3452        }])
3453        .unwrap();
3454
3455        // Both FTS indexes resolve before the wipe.
3456        assert_eq!(kg.search_nodes_filtered("Einstein", None, 0, 10).len(), 1);
3457        assert_eq!(kg.search_nodes_filtered("physics", None, 0, 10).len(), 1);
3458
3459        kg.wipe().unwrap();
3460
3461        // After wipe both indexes must be empty (a bare DELETE on an
3462        // external-content FTS5 table would have left stale rowids behind).
3463        assert_eq!(kg.get_entity_count().unwrap(), 0);
3464        assert!(kg.search_nodes_filtered("Einstein", None, 0, 10).is_empty());
3465        assert!(kg.search_nodes_filtered("physics", None, 0, 10).is_empty());
3466    }
3467
3468    #[test]
3469    fn test_wipe_then_recreate_search_works() {
3470        // Recreating the same names after a wipe must produce a clean, searchable
3471        // index — not a corrupted one or duplicate FTS rows.
3472        let kg = new_kg_with_pool(2);
3473        kg.create_entities(&[Entity {
3474            name: "Einstein".into(),
3475            entity_type: "scientist".into(),
3476            observations: vec!["physics".into()],
3477        }])
3478        .unwrap();
3479        kg.wipe().unwrap();
3480
3481        kg.create_entities(&[Entity {
3482            name: "Einstein".into(),
3483            entity_type: "scientist".into(),
3484            observations: vec!["physics".into(), "relativity".into()],
3485        }])
3486        .unwrap();
3487
3488        let by_name = kg.search_nodes_filtered("Einstein", None, 0, 10);
3489        assert_eq!(by_name.len(), 1, "exactly one Einstein after recreate");
3490        let by_obs = kg.search_nodes_filtered("relativity", None, 0, 10);
3491        assert_eq!(by_obs.len(), 1);
3492        assert_eq!(kg.get_entity_count().unwrap(), 1);
3493    }
3494
3495    // ── Read-only type/entity resolution (introduced by the reader pool) ───
3496
3497    #[test]
3498    fn test_search_relations_missing_type_returns_empty() {
3499        let kg = new_kg_with_pool(2);
3500        seed_line(&kg, 3); // edges of type "edge"
3501        // A filter for a relation type that does not exist must return nothing,
3502        // not every relation — and must not create a phantom type row.
3503        let r = kg.search_relations(None, None, Some("does_not_exist"));
3504        assert!(r.is_empty());
3505        // The phantom type must not have been inserted by the read.
3506        let types = kg.relation_type_counts();
3507        assert!(types.iter().all(|(t, _)| t != "does_not_exist"));
3508    }
3509
3510    #[test]
3511    fn test_search_relations_missing_from_returns_empty() {
3512        let kg = new_kg_with_pool(2);
3513        seed_line(&kg, 3);
3514        let r = kg.search_relations(Some("ghost"), None, None);
3515        assert!(r.is_empty(), "missing 'from' must not match every relation");
3516    }
3517
3518    #[test]
3519    fn test_search_relations_existing_filters_still_work() {
3520        let kg = new_kg_with_pool(2);
3521        seed_line(&kg, 3);
3522        let r = kg.search_relations(Some("n0"), None, Some("edge"));
3523        assert_eq!(r.len(), 1);
3524        assert_eq!(r[0].from, "n0");
3525        assert_eq!(r[0].to, "n1");
3526    }
3527
3528    #[test]
3529    fn test_neighbors_missing_type_returns_only_start() {
3530        let kg = new_kg_with_pool(2);
3531        seed_line(&kg, 3);
3532        let json = kg
3533            .neighbors("n0", Direction::Both, Some("nonexistent"), 2)
3534            .unwrap();
3535        // No edge matches the bogus type, so only the start node comes back.
3536        assert_eq!(count_entities(&json), 1);
3537        assert_eq!(count_relations(&json), 0);
3538    }
3539
3540    #[test]
3541    fn test_neighbors_existing_type_filters() {
3542        let kg = new_kg_with_pool(2);
3543        kg.create_entities(&[
3544            Entity { name: "a".into(), entity_type: "n".into(), observations: vec![] },
3545            Entity { name: "b".into(), entity_type: "n".into(), observations: vec![] },
3546            Entity { name: "c".into(), entity_type: "n".into(), observations: vec![] },
3547        ])
3548        .unwrap();
3549        kg.create_relations(&[
3550            Relation { from: "a".into(), to: "b".into(), relation_type: "knows".into() },
3551            Relation { from: "a".into(), to: "c".into(), relation_type: "likes".into() },
3552        ])
3553        .unwrap();
3554        let json = kg.neighbors("a", Direction::Outgoing, Some("knows"), 1).unwrap();
3555        assert!(json.contains("\"b\""));
3556        assert!(!json.contains("\"c\""));
3557        assert_eq!(count_relations(&json), 1);
3558    }
3559}