Skip to main content

mcp_memory/
kg.rs

1use rustc_hash::FxHashMap;
2use std::collections::{HashSet, VecDeque};
3use std::num::NonZeroUsize;
4use std::path::Path;
5use std::sync::atomic::{AtomicI64, AtomicUsize, Ordering};
6use std::time::{Duration, SystemTime, UNIX_EPOCH};
7
8use lru::LruCache;
9use parking_lot::{Mutex, MutexGuard};
10use rusqlite::{params, types::ToSql, Connection, OpenFlags};
11
12use crate::config::Durability;
13use crate::errors::{MCSError, Result};
14use crate::types::{Entity, Relation};
15
16fn sqlite_err(e: rusqlite::Error) -> MCSError {
17    MCSError::IoError(std::io::Error::other(e))
18}
19
20const fn is_not_found(e: &rusqlite::Error) -> bool {
21    matches!(e, rusqlite::Error::QueryReturnedNoRows)
22}
23
24#[inline(always)]
25fn now_us() -> i64 {
26    SystemTime::now()
27        .duration_since(UNIX_EPOCH)
28        .unwrap_or_default()
29        .as_micros() as i64
30}
31
32#[inline(always)]
33pub(crate) fn name_hash(name: &str) -> i64 {
34    let mut h: u64 = 0xcbf29ce484222325;
35    for b in name.bytes() {
36        h ^= u64::from(b);
37        h = h.wrapping_mul(0x100000001b3);
38    }
39    h as i64
40}
41
42fn load_observations(conn: &Connection, entity_id: i64) -> Result<Vec<String>> {
43    let mut stmt = conn
44        .prepare_cached("SELECT body FROM observation WHERE entity_id = ?1 ORDER BY idx")
45        .map_err(sqlite_err)?;
46    let rows = stmt
47        .query_map(params![entity_id], |row| row.get::<_, String>(0))
48        .map_err(sqlite_err)?
49        .filter_map(|r| r.ok())
50        .collect::<Vec<_>>();
51    Ok(rows)
52}
53
54fn load_observations_opt(conn: &Connection, entity_id: i64) -> Vec<String> {
55    load_observations(conn, entity_id).unwrap_or_default()
56}
57
58fn entity_name_lookup(conn: &Connection, name: &str) -> Result<Option<i64>> {
59    let h = name_hash(name);
60    let mut stmt = conn
61        .prepare_cached(
62            "SELECT id FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
63        )
64        .map_err(sqlite_err)?;
65    match stmt.query_row(params![h, name], |row| row.get::<_, i64>(0)) {
66        Ok(id) => Ok(Some(id)),
67        Err(e) if is_not_found(&e) => Ok(None),
68        Err(e) => Err(sqlite_err(e)),
69    }
70}
71
72fn get_type_id(conn: &Connection, type_name: &str, kind: i64) -> Result<i64> {
73    let mut sel = conn
74        .prepare_cached("SELECT id FROM type_dict WHERE kind = ?1 AND name = ?2")
75        .map_err(sqlite_err)?;
76    if let Ok(id) = sel.query_row(params![kind, type_name], |row| row.get::<_, i64>(0)) {
77        return Ok(id);
78    }
79    conn.execute(
80        "INSERT INTO type_dict (kind, name, count) VALUES (?1, ?2, 0)",
81        params![kind, type_name],
82    )
83    .map_err(sqlite_err)?;
84    Ok(conn.last_insert_rowid())
85}
86
87/// Read-only type lookup. Unlike [`get_type_id`] this never inserts, so it is
88/// safe to call on a `query_only` reader connection. Returns `None` when the
89/// type does not exist.
90fn lookup_type_id(conn: &Connection, type_name: &str, kind: i64) -> Option<i64> {
91    conn.prepare_cached("SELECT id FROM type_dict WHERE kind = ?1 AND name = ?2")
92        .ok()?
93        .query_row(params![kind, type_name], |row| row.get::<_, i64>(0))
94        .ok()
95}
96
97fn inc_type_count(conn: &Connection, type_id: i64, delta: i64) -> Result<()> {
98    conn.execute(
99        "UPDATE type_dict SET count = count + ?1 WHERE id = ?2",
100        params![delta, type_id],
101    )
102    .map_err(sqlite_err)?;
103    Ok(())
104}
105
106fn inc_graph_stat(conn: &Connection, key: &str, delta: i64) -> Result<()> {
107    conn.execute(
108        "UPDATE graph_stat SET value = value + ?1 WHERE key = ?2",
109        params![delta, key],
110    )
111    .map_err(sqlite_err)?;
112    Ok(())
113}
114
115fn read_graph_stat(conn: &Connection, key: &str) -> Result<i64> {
116    conn.query_row(
117        "SELECT value FROM graph_stat WHERE key = ?1",
118        params![key],
119        |row| row.get(0),
120    )
121    .map_err(sqlite_err)
122}
123
124fn name_of_type(conn: &Connection, type_id: i64) -> Result<String> {
125    conn.query_row(
126        "SELECT name FROM type_dict WHERE id = ?1",
127        params![type_id],
128        |row| row.get(0),
129    )
130    .map_err(sqlite_err)
131}
132
133fn select_all_types(conn: &Connection, kind: i64) -> Result<Vec<(String, usize)>> {
134    let mut stmt = conn
135        .prepare_cached(
136            "SELECT name, count FROM type_dict WHERE kind = ?1 AND count > 0 ORDER BY count DESC",
137        )
138        .map_err(sqlite_err)?;
139    let rows = stmt
140        .query_map(params![kind], |row| {
141            Ok((
142                row.get::<_, String>(0)?,
143                row.get::<_, i64>(1)? as usize,
144            ))
145        })
146        .map_err(sqlite_err)?
147        .filter_map(|r| r.ok())
148        .collect();
149    Ok(rows)
150}
151
152fn entity_by_id(conn: &Connection, id: i64) -> Result<Entity> {
153    let mut stmt = conn
154        .prepare_cached(
155            "SELECT e.name, t.name,
156               COALESCE((SELECT json_group_array(o.body ORDER BY o.idx) FROM observation o WHERE o.entity_id = e.id), '[]')
157             FROM entity e JOIN type_dict t ON t.id = e.type_id WHERE e.id = ?1 AND e.flags = 0",
158        )
159        .map_err(sqlite_err)?;
160    let (name, etype, obs_json): (String, String, String) = stmt
161        .query_row(params![id], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)))
162        .map_err(sqlite_err)?;
163    let observations: Vec<String> = serde_json::from_str(&obs_json).unwrap_or_default();
164    Ok(Entity {
165        name,
166        entity_type: etype,
167        observations,
168    })
169}
170
171/// Direction of relation traversal.
172#[derive(Debug, Clone, Copy, PartialEq, Eq)]
173pub enum Direction {
174    Outgoing,
175    Incoming,
176    Both,
177}
178
179impl Direction {
180    pub fn parse(s: Option<&str>) -> Self {
181        match s {
182            Some("OUTGOING") => Direction::Outgoing,
183            Some("INCOMING") => Direction::Incoming,
184            _ => Direction::Both,
185        }
186    }
187}
188
189/// Escape a string for embedding in JSON, writing directly into the given buffer.
190/// Avoids allocating a temporary `serde_json::Value` for the JSON-RPC wrapper.
191pub fn push_json_str(buf: &mut String, raw: &str) {
192    buf.push('"');
193    let mut start = 0;
194    let bytes = raw.as_bytes();
195    for (i, &b) in bytes.iter().enumerate() {
196        let esc: u8 = match b {
197            b'"' => b'"',
198            b'\\' => b'\\',
199            b'\n' => b'n',
200            b'\r' => b'r',
201            b'\t' => b't',
202            0x08 => b'b',
203            0x0C => b'f',
204            0x00..=0x07 | 0x0B | 0x0E..=0x1F => continue, // escaped below
205            _ => continue,
206        };
207        buf.push_str(&raw[start..i]);
208        buf.push('\\');
209        buf.push(esc as char);
210        start = i + 1;
211    }
212    // Control chars 0x00-0x1F not handled above: escape as \u00XX
213    for (i, &b) in bytes.iter().enumerate().skip(start) {
214        if b <= 0x07 || b == 0x0B || (0x0E..=0x1F).contains(&b) {
215            buf.push_str(&raw[start..i]);
216            write_escape_unicode(buf, b);
217            start = i + 1;
218        }
219    }
220    buf.push_str(&raw[start..]);
221    buf.push('"');
222}
223
224#[inline(never)]
225fn write_escape_unicode(buf: &mut String, b: u8) {
226    use std::fmt::Write;
227    write!(buf, "\\u{:04x}", b).unwrap();
228}
229
230// ── MetaCache ────────────────────────────────────────────────────────────
231
232#[derive(Copy, Clone)]
233struct EntityMeta {
234    id: i64,
235    type_id: i64,
236    obs_count: i64,
237    out_deg: i64,
238    in_deg: i64,
239}
240
241// ── Transaction guard (RAII rollback on error) ─────────────────────────
242
243struct TxGuard<'a> {
244    conn: &'a Connection,
245    done: bool,
246}
247
248impl<'a> TxGuard<'a> {
249    fn begin(conn: &'a Connection) -> Result<Self> {
250        // BEGIN IMMEDIATE acquires the WAL write lock up front rather than
251        // lazily on the first write. This makes the busy-timeout apply to lock
252        // acquisition deterministically and avoids `SQLITE_BUSY_SNAPSHOT`
253        // surprises when readers are concurrently active.
254        conn.execute_batch("BEGIN IMMEDIATE").map_err(sqlite_err)?;
255        Ok(Self { conn, done: false })
256    }
257
258    fn commit(mut self) -> Result<()> {
259        self.done = true;
260        self.conn.execute_batch("COMMIT").map_err(sqlite_err)
261    }
262}
263
264impl Drop for TxGuard<'_> {
265    fn drop(&mut self) {
266        if !self.done {
267            let _ = self.conn.execute_batch("ROLLBACK");
268        }
269    }
270}
271
272// ── Reader pool ───────────────────────────────────────────────────────────
273
274/// A small fixed pool of `query_only` SQLite connections used for read
275/// operations. WAL mode permits any number of concurrent readers alongside the
276/// single writer, so spreading reads across several connections lets them run
277/// in parallel instead of serializing on the writer's mutex.
278struct ReaderPool {
279    conns: Vec<Mutex<Connection>>,
280    next: AtomicUsize,
281}
282
283impl ReaderPool {
284    /// Acquire a reader connection. Fast path: grab the first idle one. If every
285    /// connection is busy, block on a round-robin pick so callers still make
286    /// progress (and never spin).
287    fn get(&self) -> MutexGuard<'_, Connection> {
288        for c in &self.conns {
289            if let Some(g) = c.try_lock() {
290                return g;
291            }
292        }
293        let i = self.next.fetch_add(1, Ordering::Relaxed) % self.conns.len();
294        self.conns[i].lock()
295    }
296}
297
298// ── GraphHandle ──────────────────────────────────────────────────────────
299
300pub struct GraphHandle {
301    /// The single read-write connection. SQLite allows only one writer, so all
302    /// mutations serialize here.
303    writer: Mutex<Connection>,
304    /// Pool of `query_only` connections for concurrent reads (WAL).
305    readers: ReaderPool,
306    seq_entity: AtomicI64,
307    seq_obs: AtomicI64,
308    cache: Mutex<LruCache<String, EntityMeta>>,
309}
310
311/// Open one `query_only` reader connection against an existing WAL database.
312///
313/// The connection is opened read-write at the OS level (so it can attach to the
314/// `-shm` wal-index — SQLite cannot read a WAL database through a pure
315/// `SQLITE_OPEN_READ_ONLY` handle) and then locked to reads with
316/// `PRAGMA query_only = ON`, which makes any accidental write error out.
317fn open_reader(path: &Path, mmap_size: i64) -> Result<Connection> {
318    let conn = Connection::open_with_flags(
319        path,
320        OpenFlags::SQLITE_OPEN_READ_WRITE
321            | OpenFlags::SQLITE_OPEN_NO_MUTEX
322            | OpenFlags::SQLITE_OPEN_URI,
323    )
324    .map_err(sqlite_err)?;
325    conn.busy_timeout(Duration::from_secs(10)).map_err(sqlite_err)?;
326    conn.execute_batch(
327        "PRAGMA query_only   = ON;
328         PRAGMA cache_size   = -50000;
329         PRAGMA temp_store   = MEMORY;",
330    )
331    .map_err(sqlite_err)?;
332    conn.execute_batch(&format!("PRAGMA mmap_size = {mmap_size};"))
333        .map_err(sqlite_err)?;
334    Ok(conn)
335}
336
337impl GraphHandle {
338    pub fn new(
339        path: &Path,
340        durability: Durability,
341        mmap_size: i64,
342        lru_cache_size: NonZeroUsize,
343        read_pool_size: usize,
344    ) -> Result<Self> {
345        {
346            let tmp = Connection::open(path).map_err(sqlite_err)?;
347            tmp.execute_batch("PRAGMA page_size = 16384;")
348                .map_err(sqlite_err)?;
349            drop(tmp);
350        }
351
352        let conn = Connection::open(path).map_err(sqlite_err)?;
353        // Apply the busy handler through the API so it is in force for every
354        // subsequent statement (including schema creation and BEGIN IMMEDIATE).
355        conn.busy_timeout(Duration::from_secs(10)).map_err(sqlite_err)?;
356
357        conn.execute_batch(
358             "PRAGMA journal_mode = WAL;
359             PRAGMA foreign_keys = OFF;
360             PRAGMA cache_size    = -50000;
361             PRAGMA temp_store    = MEMORY;
362             PRAGMA busy_timeout  = 5000;
363             PRAGMA synchronous   = NORMAL;
364             PRAGMA journal_size_limit = 67108864;
365
366             CREATE TABLE IF NOT EXISTS entity (
367                 id          INTEGER PRIMARY KEY,
368                 name_hash   INTEGER NOT NULL,
369                 name        TEXT    NOT NULL,
370                 type_id     INTEGER NOT NULL,
371                 obs_count   INTEGER NOT NULL DEFAULT 0,
372                 out_deg     INTEGER NOT NULL DEFAULT 0,
373                 in_deg      INTEGER NOT NULL DEFAULT 0,
374                 created_us  INTEGER NOT NULL,
375                 updated_us  INTEGER NOT NULL,
376                 flags       INTEGER NOT NULL DEFAULT 0
377             ) STRICT;
378
379             CREATE INDEX IF NOT EXISTS entity_by_hash
380                 ON entity(name_hash, type_id, obs_count, out_deg, in_deg)
381                 WHERE flags = 0;
382
383             CREATE INDEX IF NOT EXISTS entity_name_ci
384                 ON entity(lower(name))
385                 WHERE flags = 0;
386
387             CREATE TABLE IF NOT EXISTS observation (
388                 id          INTEGER PRIMARY KEY,
389                 entity_id   INTEGER NOT NULL,
390                 idx         INTEGER NOT NULL,
391                 body        TEXT    NOT NULL,
392                 created_us  INTEGER NOT NULL
393             ) STRICT;
394
395             CREATE INDEX IF NOT EXISTS obs_by_entity
396                 ON observation(entity_id, idx);
397
398             CREATE TABLE IF NOT EXISTS relation (
399                 from_id     INTEGER NOT NULL,
400                 to_id       INTEGER NOT NULL,
401                 type_id     INTEGER NOT NULL,
402                 created_us  INTEGER NOT NULL
403             ) STRICT;
404
405             CREATE INDEX IF NOT EXISTS rel_out
406                 ON relation(from_id, type_id, to_id);
407
408             CREATE INDEX IF NOT EXISTS rel_in
409                 ON relation(to_id, type_id, from_id);
410
411             CREATE VIRTUAL TABLE IF NOT EXISTS name_fts
412                 USING fts5(name, content='entity', content_rowid='id',
413                            tokenize='unicode61 remove_diacritics 2');
414
415             CREATE VIRTUAL TABLE IF NOT EXISTS obs_fts
416                 USING fts5(body, content='observation', content_rowid='id',
417                            tokenize='unicode61 remove_diacritics 2');
418
419             CREATE TRIGGER IF NOT EXISTS obs_fts_ai AFTER INSERT ON observation BEGIN
420               INSERT INTO obs_fts(rowid, body) VALUES (new.id, new.body);
421             END;
422
423             CREATE TRIGGER IF NOT EXISTS obs_fts_bd BEFORE DELETE ON observation BEGIN
424               INSERT INTO obs_fts(obs_fts, rowid, body) VALUES ('delete', old.id, '');
425             END;
426
427             CREATE TABLE IF NOT EXISTS type_dict (
428                 id     INTEGER PRIMARY KEY,
429                 kind   INTEGER NOT NULL,
430                 name   TEXT    NOT NULL,
431                 count  INTEGER NOT NULL DEFAULT 0
432             ) STRICT;
433
434             CREATE INDEX IF NOT EXISTS type_by_name
435                 ON type_dict(kind, name);
436
437             CREATE TABLE IF NOT EXISTS graph_stat (
438                 key    TEXT NOT NULL PRIMARY KEY,
439                 value  INTEGER NOT NULL
440             ) STRICT, WITHOUT ROWID;
441
442             CREATE TABLE IF NOT EXISTS hub_degree (
443                 entity_id INTEGER PRIMARY KEY,
444                 out_deg   INTEGER NOT NULL,
445                 in_deg    INTEGER NOT NULL
446             ) STRICT;
447
448             CREATE TABLE IF NOT EXISTS partition_map (
449                 table_name TEXT NOT NULL PRIMARY KEY,
450                 role       INTEGER NOT NULL,
451                 type_id    INTEGER,
452                 row_count  INTEGER NOT NULL DEFAULT 0
453             ) STRICT, WITHOUT ROWID;",
454        )
455        .map_err(sqlite_err)?;
456
457        conn.execute_batch(&format!("PRAGMA mmap_size = {mmap_size};"))
458            .map_err(sqlite_err)?;
459
460        let sync_pragma = match durability {
461            Durability::Sync => "PRAGMA synchronous = FULL",
462            Durability::Async => "PRAGMA synchronous = NORMAL",
463        };
464        conn.execute_batch(sync_pragma).map_err(sqlite_err)?;
465
466        let has_stat: bool = conn
467            .query_row("SELECT 1 FROM graph_stat LIMIT 1", [], |_| Ok(()))
468            .is_ok();
469        if !has_stat {
470            conn.execute_batch(
471                "INSERT INTO graph_stat(key, value) VALUES
472                 ('entities', 0), ('relations', 0), ('observations', 0),
473                 ('entity_seq', 0), ('obs_seq', 0);",
474            )
475            .map_err(sqlite_err)?;
476        }
477
478        conn.execute_batch("PRAGMA optimize;").map_err(sqlite_err)?;
479
480        let seq_entity = read_graph_stat(&conn, "entity_seq").unwrap_or(0);
481        let seq_obs = read_graph_stat(&conn, "obs_seq").unwrap_or(0);
482
483        // Open the reader pool against the now-initialized database. At least one
484        // reader is always created.
485        let pool_size = read_pool_size.max(1);
486        let mut conns = Vec::with_capacity(pool_size);
487        for _ in 0..pool_size {
488            conns.push(Mutex::new(open_reader(path, mmap_size)?));
489        }
490        let readers = ReaderPool {
491            conns,
492            next: AtomicUsize::new(0),
493        };
494
495        Ok(Self {
496            writer: Mutex::new(conn),
497            readers,
498            seq_entity: AtomicI64::new(seq_entity),
499            seq_obs: AtomicI64::new(seq_obs),
500            cache: Mutex::new(LruCache::new(lru_cache_size)),
501        })
502    }
503
504    fn next_entity_id(&self) -> i64 {
505        self.seq_entity.fetch_add(1, Ordering::Relaxed) + 1
506    }
507
508    fn next_obs_id(&self) -> i64 {
509        self.seq_obs.fetch_add(1, Ordering::Relaxed) + 1
510    }
511
512    fn meta_get(&self, name: &str) -> Option<EntityMeta> {
513        self.cache.lock().get(name).copied()
514    }
515
516    fn meta_set(&self, name: &str, m: EntityMeta) {
517        self.cache.lock().put(name.to_string(), m);
518    }
519
520    fn meta_remove(&self, name: &str) {
521        self.cache.lock().pop(name);
522    }
523
524    fn meta_update(&self, name: &str, f: impl FnOnce(&mut EntityMeta)) {
525        let mut cache = self.cache.lock();
526        if let Some(m) = cache.get_mut(name) {
527            f(m);
528        }
529    }
530
531    fn get_entity_id(&self, conn: &Connection, name: &str) -> Result<Option<(i64, i64, i64, i64)>> {
532        if let Some(m) = self.meta_get(name) {
533            return Ok(Some((m.id, m.type_id, m.out_deg, m.in_deg)));
534        }
535        let h = name_hash(name);
536        let mut stmt = conn
537            .prepare_cached(
538                "SELECT id, type_id, obs_count, out_deg, in_deg
539                 FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
540            )
541            .map_err(sqlite_err)?;
542        match stmt.query_row(params![h, name], |row| {
543            Ok(EntityMeta {
544                id: row.get(0)?,
545                type_id: row.get(1)?,
546                obs_count: row.get(2)?,
547                out_deg: row.get(3)?,
548                in_deg: row.get(4)?,
549            })
550        }) {
551            Ok(m) => {
552                self.meta_set(name, m);
553                Ok(Some((m.id, m.type_id, m.out_deg, m.in_deg)))
554            }
555            Err(e) if is_not_found(&e) => Ok(None),
556            Err(e) => Err(sqlite_err(e)),
557        }
558    }
559
560    fn sync_seqs(&self, conn: &Connection) -> Result<()> {
561        let seq_e = self.seq_entity.load(Ordering::Relaxed);
562        let seq_o = self.seq_obs.load(Ordering::Relaxed);
563        conn.execute(
564            "UPDATE graph_stat SET value = CASE key WHEN 'entity_seq' THEN ?1 WHEN 'obs_seq' THEN ?2 ELSE value END
565             WHERE key IN ('entity_seq', 'obs_seq')",
566            params![seq_e, seq_o],
567        )
568        .map_err(sqlite_err)?;
569        Ok(())
570    }
571
572    // ── Public API ──────────────────────────────────────────────────────
573
574    pub fn get_entity(&self, name: &str) -> Result<Option<Entity>> {
575        if name.is_empty() {
576            return Ok(None);
577        }
578
579        if let Some(m) = self.meta_get(name) {
580            let conn = self.readers.get();
581            let etype = name_of_type(&conn, m.type_id).unwrap_or_default();
582            let observations = load_observations_opt(&conn, m.id);
583            return Ok(Some(Entity {
584                name: name.to_string(),
585                entity_type: etype,
586                observations,
587            }));
588        }
589
590        let conn = self.readers.get();
591        let h = name_hash(name);
592        let mut stmt = conn
593            .prepare_cached(
594                "SELECT e.id, e.type_id, e.name, t.name,
595                        e.obs_count, e.out_deg, e.in_deg
596                 FROM entity e
597                 JOIN type_dict t ON t.id = e.type_id
598                 WHERE e.name_hash = ?1 AND e.name = ?2 AND e.flags = 0",
599            )
600            .map_err(sqlite_err)?;
601        match stmt.query_row(params![h, name], |row| {
602            let id: i64 = row.get(0)?;
603            let type_id: i64 = row.get(1)?;
604            let ename: String = row.get(2)?;
605            let etype: String = row.get(3)?;
606            let obs_count: i64 = row.get(4)?;
607            let out_deg: i64 = row.get(5)?;
608            let in_deg: i64 = row.get(6)?;
609            Ok((id, type_id, ename, etype, obs_count, out_deg, in_deg))
610        }) {
611            Ok((id, type_id, ename, etype, obs_count, out_deg, in_deg)) => {
612                let observations = load_observations_opt(&conn, id);
613                drop(stmt);
614                drop(conn);
615                self.meta_set(&ename, EntityMeta { id, type_id, obs_count, out_deg, in_deg });
616                Ok(Some(Entity {
617                    name: ename,
618                    entity_type: etype,
619                    observations,
620                }))
621            }
622            Err(e) if is_not_found(&e) => Ok(None),
623            Err(e) => Err(sqlite_err(e)),
624        }
625    }
626
627    pub fn create_entities(&self, entities: &[Entity]) -> Result<Vec<Entity>> {
628        let conn = self.writer.lock();
629        let tx = TxGuard::begin(&conn)?;
630
631        let mut ins_ent = conn
632            .prepare_cached(
633                "INSERT INTO entity (id, name_hash, name, type_id, obs_count, out_deg, in_deg, created_us, updated_us, flags)
634                 SELECT ?1, ?2, ?3, ?4, ?5, 0, 0, ?6, ?6, 0
635                 WHERE NOT EXISTS (SELECT 1 FROM entity WHERE name_hash = ?2 AND name = ?3 AND flags = 0)",
636            )
637            .map_err(sqlite_err)?;
638
639        let mut ins_fts = conn
640            .prepare_cached("INSERT INTO name_fts (rowid, name) VALUES (?1, ?2)")
641            .map_err(sqlite_err)?;
642
643        let batch_ts = now_us();
644        let mut type_cache: FxHashMap<String, i64> = FxHashMap::default();
645        let mut type_deltas: FxHashMap<i64, i64> = FxHashMap::default();
646        let mut total_entities: i64 = 0;
647        let mut total_obs: i64 = 0;
648        let mut created = Vec::new();
649        let mut created_metas: Vec<(String, EntityMeta)> = Vec::new();
650        let mut obs_sql = String::new();
651
652        for entity in entities {
653            if entity.name.is_empty() {
654                continue;
655            }
656            let h = name_hash(&entity.name);
657            let id = self.next_entity_id();
658            let type_id = match type_cache.get(entity.entity_type.as_str()) {
659                Some(t) => *t,
660                None => {
661                    let t = get_type_id(&conn, &entity.entity_type, 0)?;
662                    type_cache.insert(entity.entity_type.clone(), t);
663                    t
664                }
665            };
666            let obs_count = entity.observations.len() as i64;
667
668            let changed = ins_ent
669                .execute(params![id, h, entity.name, type_id, obs_count, batch_ts])
670                .map_err(sqlite_err)?;
671            if changed == 0 {
672                continue;
673            }
674
675            let n = entity.observations.len();
676            if n > 0 {
677                obs_sql.clear();
678
679                let mut oids = Vec::with_capacity(n);
680                let mut idxs = Vec::with_capacity(n);
681                for _ in 0..n {
682                    oids.push(self.next_obs_id());
683                }
684                for i in 0..n as i64 {
685                    idxs.push(i);
686                }
687
688                obs_sql.push_str("INSERT INTO observation (id,entity_id,idx,body,created_us) VALUES");
689                for i in 0..n {
690                    if i > 0 { obs_sql.push(','); }
691                    obs_sql.push_str("(?,?,?,?,?)");
692                }
693
694                let mut obs_params: Vec<&dyn ToSql> = Vec::with_capacity(n * 5);
695                for (i, obs) in entity.observations.iter().enumerate() {
696                    obs_params.push(&oids[i]);
697                    obs_params.push(&id);
698                    obs_params.push(&idxs[i]);
699                    obs_params.push(obs);
700                    obs_params.push(&batch_ts);
701                }
702
703                conn.execute(&obs_sql, obs_params.as_slice())
704                    .map_err(sqlite_err)?;
705            }
706
707            ins_fts
708                .execute(params![id, entity.name])
709                .map_err(sqlite_err)?;
710
711            *type_deltas.entry(type_id).or_insert(0) += 1;
712            total_entities += 1;
713            total_obs += obs_count;
714
715            created.push(entity.clone());
716            created_metas.push((entity.name.clone(), EntityMeta {
717                id,
718                type_id,
719                obs_count,
720                out_deg: 0,
721                in_deg: 0,
722            }));
723        }
724
725        if total_entities > 0 {
726            for (type_id, delta) in &type_deltas {
727                inc_type_count(&conn, *type_id, *delta)?;
728            }
729            inc_graph_stat(&conn, "entities", total_entities)?;
730            inc_graph_stat(&conn, "observations", total_obs)?;
731            self.sync_seqs(&conn)?;
732        }
733
734        tx.commit()?;
735
736        // Note: `PRAGMA optimize` is intentionally *not* run here. It analyzes
737        // indexes and writes to internal stat tables — pure overhead on the
738        // write hot path. The periodic `run_maintenance` task covers it.
739
740        if !created_metas.is_empty() {
741            let mut cache = self.cache.lock();
742            for (name, meta) in &created_metas {
743                cache.put(name.clone(), *meta);
744            }
745        }
746
747        Ok(created)
748    }
749
750    pub fn delete_entities(&self, names: &[String]) -> Result<()> {
751        if names.is_empty() {
752            return Ok(());
753        }
754        let conn = self.writer.lock();
755
756        // Phase 1: Resolve all names to (id, type_id).
757        let mut resolved: Vec<(i64, i64, String)> = Vec::with_capacity(names.len());
758        let mut sel = conn
759            .prepare_cached(
760                "SELECT id, type_id FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
761            )
762            .map_err(sqlite_err)?;
763        for name in names {
764            let h = name_hash(name);
765            let (id, type_id) = match sel.query_row(params![h, name], |row| {
766                Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?))
767            }) {
768                Ok(v) => v,
769                Err(e) if is_not_found(&e) => continue,
770                Err(e) => return Err(sqlite_err(e)),
771            };
772            resolved.push((id, type_id, name.clone()));
773        }
774
775        if resolved.is_empty() {
776            return Ok(());
777        }
778
779        let ids: Vec<i64> = resolved.iter().map(|(id, _, _)| *id).collect();
780        let n = ids.len();
781
782        // Phase 2: Batch DELETE observations.
783        let obs_p: Vec<String> = (0..n).map(|i| format!("?{}", i + 1)).collect();
784        let obs_sql = format!(
785            "DELETE FROM observation WHERE entity_id IN ({})",
786            obs_p.join(",")
787        );
788        let obs_refs: Vec<&dyn ToSql> = ids.iter().map(|id| id as &dyn ToSql).collect();
789        let obs_deleted = conn
790            .execute(&obs_sql, obs_refs.as_slice())
791            .map_err(sqlite_err)? as i64;
792
793        // Phase 3: Batch DELETE relations.
794        let rel_sql = format!(
795            "DELETE FROM relation WHERE from_id IN ({}) OR to_id IN ({})",
796            obs_p.join(","),
797            obs_p.join(",")
798        );
799        let rel_refs: Vec<&dyn ToSql> = ids.iter().map(|id| id as &dyn ToSql).collect();
800        let rel_deleted = conn
801            .execute(&rel_sql, rel_refs.as_slice())
802            .map_err(sqlite_err)? as i64;
803
804        // Phase 4: Batch FTS deletes.
805        let fts_values: Vec<String> = (0..n)
806            .map(|_| "('delete', ?, '')".to_string())
807            .collect();
808        let fts_sql = format!(
809            "INSERT INTO name_fts(name_fts, rowid, name) VALUES {}",
810            fts_values.join(", ")
811        );
812        conn.execute(&fts_sql, rusqlite::params_from_iter(&ids))
813            .map_err(sqlite_err)?;
814
815        // Aggregate type count deltas.
816        let mut type_deltas: FxHashMap<i64, i64> = FxHashMap::default();
817        for &(_, type_id, _) in &resolved {
818            *type_deltas.entry(type_id).or_insert(0) += 1;
819        }
820
821        // Phase 5: Batch type count decrements.
822        if !type_deltas.is_empty() {
823            let m = type_deltas.len();
824            let type_keys: Vec<i64> = type_deltas.keys().cloned().collect();
825            let type_vals: Vec<i64> = type_deltas.values().map(|v| -*v).collect();
826            let mut case_parts: Vec<String> = Vec::with_capacity(m);
827            let mut id_parts: Vec<String> = Vec::with_capacity(m);
828            for i in 0..m {
829                case_parts.push(format!("WHEN ?{} THEN ?{}", i + 1, m + i + 1));
830                id_parts.push(format!("?{}", i + 1));
831            }
832            let sql = format!(
833                "UPDATE type_dict SET count = MAX(0, count + CASE id {} ELSE 0 END) WHERE id IN ({})",
834                case_parts.join(" "),
835                id_parts.join(","),
836            );
837            let mut params: Vec<Box<dyn ToSql>> = Vec::with_capacity(2 * m);
838            for id in &type_keys {
839                params.push(Box::new(*id));
840            }
841            for delta in &type_vals {
842                params.push(Box::new(*delta));
843            }
844            let param_refs: Vec<&dyn ToSql> = params.iter().map(|p| p.as_ref()).collect();
845            conn.execute(&sql, param_refs.as_slice()).map_err(sqlite_err)?;
846        }
847
848        // Phase 6: Batch DELETE entities.
849        conn.execute(
850            &format!("DELETE FROM entity WHERE id IN ({})", obs_p.join(",")),
851            ids.iter().map(|id| id as &dyn ToSql).collect::<Vec<_>>().as_slice(),
852        )
853        .map_err(sqlite_err)?;
854
855        // Phase 7: Update stats.
856        inc_graph_stat(&conn, "entities", -(n as i64))?;
857        inc_graph_stat(&conn, "observations", -obs_deleted)?;
858        inc_graph_stat(&conn, "relations", -rel_deleted)?;
859
860        // Phase 8: Remove from cache.
861        for (_, _, name) in &resolved {
862            self.meta_remove(name);
863        }
864
865        Ok(())
866    }
867
868    pub fn create_relations(&self, relations: &[Relation]) -> Result<Vec<Relation>> {
869        let conn = self.writer.lock();
870        let tx = TxGuard::begin(&conn)?;
871
872        let mut ins = conn
873            .prepare_cached(
874                "INSERT INTO relation (from_id, to_id, type_id, created_us)
875                 SELECT ?1, ?2, ?3, ?4
876                 WHERE NOT EXISTS (SELECT 1 FROM relation WHERE from_id = ?1 AND to_id = ?2 AND type_id = ?3)",
877            )
878            .map_err(sqlite_err)?;
879
880        let ts = now_us();
881        let mut type_cache: FxHashMap<String, i64> = FxHashMap::default();
882        let mut type_deltas: FxHashMap<i64, i64> = FxHashMap::default();
883        let mut out_deltas: FxHashMap<i64, i64> = FxHashMap::default();
884        let mut in_deltas: FxHashMap<i64, i64> = FxHashMap::default();
885        let mut total_relations: i64 = 0;
886        let mut created = Vec::new();
887
888        for rel in relations {
889            let (from_id, _, _, _) = match self.get_entity_id(&conn, &rel.from)? {
890                Some(v) => v,
891                None => continue,
892            };
893            let (to_id, _, _, _) = match self.get_entity_id(&conn, &rel.to)? {
894                Some(v) => v,
895                None => continue,
896            };
897            let type_id = match type_cache.get(rel.relation_type.as_str()) {
898                Some(t) => *t,
899                None => {
900                    let t = get_type_id(&conn, &rel.relation_type, 1)?;
901                    type_cache.insert(rel.relation_type.clone(), t);
902                    t
903                }
904            };
905
906            let changed = ins
907                .execute(params![from_id, to_id, type_id, ts])
908                .map_err(sqlite_err)?;
909            if changed == 0 {
910                continue;
911            }
912
913            *out_deltas.entry(from_id).or_insert(0) += 1;
914            *in_deltas.entry(to_id).or_insert(0) += 1;
915            *type_deltas.entry(type_id).or_insert(0) += 1;
916            total_relations += 1;
917
918            created.push(rel.clone());
919        }
920
921        if total_relations > 0 {
922            for (id, delta) in &out_deltas {
923                conn.execute(
924                    "UPDATE entity SET out_deg = out_deg + ?1 WHERE id = ?2",
925                    params![delta, id],
926                )
927                .map_err(sqlite_err)?;
928            }
929            for (id, delta) in in_deltas {
930                conn.execute(
931                    "UPDATE entity SET in_deg = in_deg + ?1 WHERE id = ?2",
932                    params![delta, id],
933                )
934                .map_err(sqlite_err)?;
935            }
936            for (type_id, delta) in &type_deltas {
937                inc_type_count(&conn, *type_id, *delta)?;
938            }
939            inc_graph_stat(&conn, "relations", total_relations)?;
940        }
941
942        tx.commit()?;
943
944        // See `create_entities`: `PRAGMA optimize` is deferred to maintenance.
945
946        if !created.is_empty() {
947            let mut cache = self.cache.lock();
948            for rel in &created {
949                if let Some(m) = cache.get_mut(&rel.from) {
950                    m.out_deg += 1;
951                }
952                if let Some(m) = cache.get_mut(&rel.to) {
953                    m.in_deg += 1;
954                }
955            }
956        }
957
958        Ok(created)
959    }
960
961    pub fn delete_relations(&self, relations: &[Relation]) -> Result<()> {
962        if relations.is_empty() {
963            return Ok(());
964        }
965        let conn = self.writer.lock();
966
967        // Resolve names to IDs and collect valid triples.
968        let mut triples: Vec<(i64, i64, i64)> = Vec::with_capacity(relations.len());
969        let mut names: Vec<(String, String)> = Vec::with_capacity(relations.len());
970        for rel in relations {
971            let (from_id, _, _, _) = match self.get_entity_id(&conn, &rel.from)? {
972                Some(v) => v,
973                None => continue,
974            };
975            let (to_id, _, _, _) = match self.get_entity_id(&conn, &rel.to)? {
976                Some(v) => v,
977                None => continue,
978            };
979            let type_id = match get_type_id(&conn, &rel.relation_type, 1) {
980                Ok(id) => id,
981                Err(_) => continue,
982            };
983            triples.push((from_id, to_id, type_id));
984            names.push((rel.from.clone(), rel.to.clone()));
985        }
986
987        if triples.is_empty() {
988            return Ok(());
989        }
990
991        // Batch DELETE using VALUES subquery.
992        let mut sql = String::from(
993            "DELETE FROM relation WHERE (from_id, to_id, type_id) IN (",
994        );
995        for (i, _) in triples.iter().enumerate() {
996            if i > 0 {
997                sql.push_str(", ");
998            }
999            let base = (i * 3) + 1;
1000            sql.push_str(&format!("SELECT ?{b}, ?{bp1}, ?{bp2}", b = base, bp1 = base + 1, bp2 = base + 2));
1001        }
1002        sql.push(')');
1003
1004        let mut param_values: Vec<Box<dyn ToSql>> = Vec::with_capacity(triples.len() * 3);
1005        for &(f, t, tp) in &triples {
1006            param_values.push(Box::new(f));
1007            param_values.push(Box::new(t));
1008            param_values.push(Box::new(tp));
1009        }
1010        let param_refs: Vec<&dyn ToSql> = param_values.iter().map(|p| p.as_ref()).collect();
1011        let total = conn.execute(&sql, param_refs.as_slice()).map_err(sqlite_err)?;
1012        if total == 0 {
1013            return Ok(());
1014        }
1015
1016        // Aggregate degree and type deltas.
1017        let mut out_deltas: FxHashMap<i64, i64> = FxHashMap::default();
1018        let mut in_deltas: FxHashMap<i64, i64> = FxHashMap::default();
1019        let mut type_deltas: FxHashMap<i64, i64> = FxHashMap::default();
1020        for &(from_id, to_id, type_id) in &triples {
1021            *out_deltas.entry(from_id).or_insert(0) += 1;
1022            *in_deltas.entry(to_id).or_insert(0) += 1;
1023            *type_deltas.entry(type_id).or_insert(0) += 1;
1024        }
1025
1026        // Batch out_deg updates.
1027        let out_keys: Vec<i64> = out_deltas.keys().cloned().collect();
1028        let out_vals: Vec<i64> = out_deltas.values().cloned().collect();
1029        if !out_keys.is_empty() {
1030            let m = out_keys.len();
1031            let mut case_parts: Vec<String> = Vec::with_capacity(m);
1032            let mut id_parts: Vec<String> = Vec::with_capacity(m);
1033            for i in 0..m {
1034                case_parts.push(format!("WHEN ?{} THEN ?{}", i + 1, m + i + 1));
1035                id_parts.push(format!("?{}", i + 1));
1036            }
1037            let sql = format!(
1038                "UPDATE entity SET out_deg = MAX(0, out_deg - CASE id {} ELSE 0 END) WHERE id IN ({})",
1039                case_parts.join(" "),
1040                id_parts.join(","),
1041            );
1042            let mut params: Vec<Box<dyn ToSql>> = Vec::with_capacity(2 * m);
1043            for id in &out_keys {
1044                params.push(Box::new(*id));
1045            }
1046            for delta in &out_vals {
1047                params.push(Box::new(*delta));
1048            }
1049            let param_refs: Vec<&dyn ToSql> = params.iter().map(|p| p.as_ref()).collect();
1050            conn.execute(&sql, param_refs.as_slice()).map_err(sqlite_err)?;
1051        }
1052
1053        // Batch in_deg updates.
1054        let in_keys: Vec<i64> = in_deltas.keys().cloned().collect();
1055        let in_vals: Vec<i64> = in_deltas.values().cloned().collect();
1056        if !in_keys.is_empty() {
1057            let m = in_keys.len();
1058            let mut case_parts: Vec<String> = Vec::with_capacity(m);
1059            let mut id_parts: Vec<String> = Vec::with_capacity(m);
1060            for i in 0..m {
1061                case_parts.push(format!("WHEN ?{} THEN ?{}", i + 1, m + i + 1));
1062                id_parts.push(format!("?{}", i + 1));
1063            }
1064            let sql = format!(
1065                "UPDATE entity SET in_deg = MAX(0, in_deg - CASE id {} ELSE 0 END) WHERE id IN ({})",
1066                case_parts.join(" "),
1067                id_parts.join(","),
1068            );
1069            let mut params: Vec<Box<dyn ToSql>> = Vec::with_capacity(2 * m);
1070            for id in &in_keys {
1071                params.push(Box::new(*id));
1072            }
1073            for delta in &in_vals {
1074                params.push(Box::new(*delta));
1075            }
1076            let param_refs: Vec<&dyn ToSql> = params.iter().map(|p| p.as_ref()).collect();
1077            conn.execute(&sql, param_refs.as_slice()).map_err(sqlite_err)?;
1078        }
1079
1080        // Batch type_dict updates.
1081        let type_keys: Vec<i64> = type_deltas.keys().cloned().collect();
1082        let type_vals: Vec<i64> = type_deltas.values().cloned().collect();
1083        if !type_keys.is_empty() {
1084            let m = type_keys.len();
1085            let mut case_parts: Vec<String> = Vec::with_capacity(m);
1086            let mut id_parts: Vec<String> = Vec::with_capacity(m);
1087            for i in 0..m {
1088                case_parts.push(format!("WHEN ?{} THEN ?{}", i + 1, m + i + 1));
1089                id_parts.push(format!("?{}", i + 1));
1090            }
1091            let sql = format!(
1092                "UPDATE type_dict SET count = MAX(0, count - CASE id {} ELSE 0 END) WHERE id IN ({})",
1093                case_parts.join(" "),
1094                id_parts.join(","),
1095            );
1096            let mut params: Vec<Box<dyn ToSql>> = Vec::with_capacity(2 * m);
1097            for id in &type_keys {
1098                params.push(Box::new(*id));
1099            }
1100            for delta in &type_vals {
1101                params.push(Box::new(*delta));
1102            }
1103            let param_refs: Vec<&dyn ToSql> = params.iter().map(|p| p.as_ref()).collect();
1104            conn.execute(&sql, param_refs.as_slice()).map_err(sqlite_err)?;
1105        }
1106
1107        inc_graph_stat(&conn, "relations", -(total as i64))?;
1108
1109        // Update cache for resolved triples (self-heals on next reload if
1110        // a triple happened to not match).
1111        for (from, to) in &names {
1112            self.meta_update(from, |m| m.out_deg = m.out_deg.saturating_sub(1));
1113            self.meta_update(to, |m| m.in_deg = m.in_deg.saturating_sub(1));
1114        }
1115
1116        Ok(())
1117    }
1118
1119    pub fn add_observations(&self, entity_name: &str, contents: &[String]) -> Result<Vec<String>> {
1120        let conn = self.writer.lock();
1121        let (id, _type_id, _, _) = match self.get_entity_id(&conn, entity_name)? {
1122            Some(v) => v,
1123            None => {
1124                return Err(MCSError::InvalidParams(format!(
1125                    "Entity '{entity_name}' not found"
1126                )))
1127            }
1128        };
1129
1130        let mut max_idx: i64 = conn
1131            .query_row(
1132                "SELECT COALESCE(MAX(idx), -1) FROM observation WHERE entity_id = ?1",
1133                params![id],
1134                |row| row.get(0),
1135            )
1136            .map_err(sqlite_err)?;
1137
1138        let ts = now_us();
1139        let mut ins_obs = conn
1140            .prepare_cached(
1141                "INSERT INTO observation (id, entity_id, idx, body, created_us) VALUES (?1, ?2, ?3, ?4, ?5)",
1142            )
1143            .map_err(sqlite_err)?;
1144
1145        for content in contents {
1146            max_idx += 1;
1147            let oid = self.next_obs_id();
1148            ins_obs
1149                .execute(params![oid, id, max_idx, content, ts])
1150                .map_err(sqlite_err)?;
1151        }
1152        let added = contents.to_vec();
1153
1154        let count: i64 = contents.len() as i64;
1155        conn.execute(
1156            "UPDATE entity SET obs_count = obs_count + ?1, updated_us = ?2 WHERE id = ?3",
1157            params![count, ts, id],
1158        )
1159        .map_err(sqlite_err)?;
1160
1161        inc_graph_stat(&conn, "observations", count)?;
1162        self.sync_seqs(&conn)?;
1163
1164        self.meta_update(entity_name, |m| m.obs_count += count);
1165
1166        Ok(added)
1167    }
1168
1169    pub fn delete_observations(&self, entity_name: &str, observations: &[String]) -> Result<()> {
1170        if observations.is_empty() {
1171            return Ok(());
1172        }
1173        let conn = self.writer.lock();
1174        let (id, _, _, _) = match self.get_entity_id(&conn, entity_name)? {
1175            Some(v) => v,
1176            None => {
1177                return Err(MCSError::InvalidParams(format!(
1178                    "Entity '{entity_name}' not found"
1179                )))
1180            }
1181        };
1182
1183        let placeholders: Vec<String> = (0..observations.len())
1184            .map(|i| format!("?{}", i + 2))
1185            .collect();
1186        let sql = format!(
1187            "DELETE FROM observation WHERE entity_id = ?1 AND body IN ({})",
1188            placeholders.join(",")
1189        );
1190
1191        let mut param_values: Vec<Box<dyn ToSql>> = Vec::with_capacity(1 + observations.len());
1192        param_values.push(Box::new(id));
1193        for obs in observations {
1194            param_values.push(Box::new(obs.as_str()));
1195        }
1196        let param_refs: Vec<&dyn ToSql> = param_values.iter().map(|p| p.as_ref()).collect();
1197        let removed = conn.execute(&sql, param_refs.as_slice()).map_err(sqlite_err)? as i64;
1198
1199        if removed > 0 {
1200            conn.execute(
1201                "UPDATE entity SET obs_count = MAX(0, obs_count - ?1), updated_us = ?2 WHERE id = ?3",
1202                params![removed, now_us(), id],
1203            )
1204            .map_err(sqlite_err)?;
1205            inc_graph_stat(&conn, "observations", -removed)?;
1206
1207            self.meta_update(entity_name, |m| m.obs_count = m.obs_count.saturating_sub(removed));
1208        }
1209
1210        Ok(())
1211    }
1212
1213    pub fn upsert_entities(&self, entities: &[Entity]) -> Result<Vec<Entity>> {
1214        let mut results = Vec::new();
1215        for entity in entities {
1216            if let Some(existing) = self.get_entity(&entity.name)? {
1217                // Update type if different.
1218                if existing.entity_type != entity.entity_type {
1219                    let conn = self.writer.lock();
1220                    let old_type_id = conn
1221                        .query_row(
1222                            "SELECT type_id FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
1223                            params![name_hash(&entity.name), entity.name],
1224                            |row| row.get::<_, i64>(0),
1225                        )
1226                        .map_err(sqlite_err)?;
1227                    let new_type_id = get_type_id(&conn, &entity.entity_type, 0)?;
1228                    inc_type_count(&conn, old_type_id, -1)?;
1229                    inc_type_count(&conn, new_type_id, 1)?;
1230                    conn.execute(
1231                        "UPDATE entity SET type_id = ?1, updated_us = ?2 WHERE name_hash = ?3 AND name = ?4",
1232                        params![new_type_id, now_us(), name_hash(&entity.name), entity.name],
1233                    )
1234                    .map_err(sqlite_err)?;
1235                    // Invalidate cache so subsequent get_entity reloads meta.
1236                    self.meta_remove(&entity.name);
1237                }
1238                // Merge observations (append new ones not already present).
1239                let existing_set: HashSet<&str> =
1240                    existing.observations.iter().map(|s| s.as_str()).collect();
1241                let to_add: Vec<String> = entity
1242                    .observations
1243                    .iter()
1244                    .filter(|o| !existing_set.contains(o.as_str()))
1245                    .cloned()
1246                    .collect();
1247                if !to_add.is_empty() {
1248                    self.add_observations(&entity.name, &to_add)?;
1249                }
1250                let updated = self
1251                    .get_entity(&entity.name)?
1252                    .unwrap_or(entity.clone());
1253                results.push(updated);
1254            } else {
1255                let c = self.create_entities(std::slice::from_ref(entity))?;
1256                if let Some(e) = c.into_iter().next() {
1257                    results.push(e);
1258                }
1259            }
1260        }
1261        Ok(results)
1262    }
1263
1264    pub fn merge_entities(&self, source: &str, target: &str) -> Result<Entity> {
1265        let conn = self.writer.lock();
1266        let (src_id, _, _, _) = match self.get_entity_id(&conn, source)? {
1267            Some(v) => v,
1268            None => {
1269                return Err(MCSError::InvalidParams(format!(
1270                    "Source entity '{source}' not found"
1271                )))
1272            }
1273        };
1274        let (tgt_id, _, _, _) = match self.get_entity_id(&conn, target)? {
1275            Some(v) => v,
1276            None => {
1277                return Err(MCSError::InvalidParams(format!(
1278                    "Target entity '{target}' not found"
1279                )))
1280            }
1281        };
1282
1283        if src_id == tgt_id {
1284            return self.get_entity(target)?.ok_or_else(|| {
1285                MCSError::InvalidParams("Target entity not found after merge".into())
1286            });
1287        }
1288
1289        // Move observations from source to target.
1290        let mut obs_count: i64 = 0;
1291        {
1292            let mut max_idx: i64 = conn
1293                .query_row(
1294                    "SELECT COALESCE(MAX(idx), -1) FROM observation WHERE entity_id = ?1",
1295                    params![tgt_id],
1296                    |row| row.get(0),
1297                )
1298                .map_err(sqlite_err)?;
1299            let mut sel_obs = conn
1300                .prepare_cached(
1301                    "SELECT id, body FROM observation WHERE entity_id = ?1 ORDER BY idx",
1302                )
1303                .map_err(sqlite_err)?;
1304            let mut upd_obs = conn
1305                .prepare_cached("UPDATE observation SET entity_id = ?1, idx = ?2 WHERE id = ?3")
1306                .map_err(sqlite_err)?;
1307            let rows: Vec<(i64, String)> = sel_obs
1308                .query_map(params![src_id], |row| {
1309                    Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?))
1310                })
1311                .map_err(sqlite_err)?
1312                .filter_map(|r| r.ok())
1313                .collect();
1314            for (oid, _body) in &rows {
1315                max_idx += 1;
1316                upd_obs
1317                    .execute(params![tgt_id, max_idx, oid])
1318                    .map_err(sqlite_err)?;
1319                obs_count += 1;
1320            }
1321        }
1322
1323        // Move relations from source to target.
1324        conn.execute(
1325            "UPDATE OR IGNORE relation SET from_id = ?1 WHERE from_id = ?2",
1326            params![tgt_id, src_id],
1327        )
1328        .map_err(sqlite_err)?;
1329        conn.execute(
1330            "UPDATE OR IGNORE relation SET to_id = ?1 WHERE to_id = ?2",
1331            params![tgt_id, src_id],
1332        )
1333        .map_err(sqlite_err)?;
1334        // Delete orphaned relations that were updated by the above (the "OR IGNORE"
1335        // keeps the first, but we still have the original row with the old id? No —
1336        // UPDATE OR IGNORE won't remove. So we must delete any that still reference src_id.)
1337        conn.execute("DELETE FROM relation WHERE from_id = ?1", params![src_id])
1338            .map_err(sqlite_err)?;
1339        conn.execute("DELETE FROM relation WHERE to_id = ?1", params![src_id])
1340            .map_err(sqlite_err)?;
1341
1342        // Update degrees on target.
1343        let out_add: i64 = conn
1344            .query_row(
1345                "SELECT COUNT(*) FROM relation WHERE from_id = ?1",
1346                params![tgt_id],
1347                |row| row.get(0),
1348            )
1349            .map_err(sqlite_err)?;
1350        let in_add: i64 = conn
1351            .query_row(
1352                "SELECT COUNT(*) FROM relation WHERE to_id = ?1",
1353                params![tgt_id],
1354                |row| row.get(0),
1355            )
1356            .map_err(sqlite_err)?;
1357        conn.execute(
1358            "UPDATE entity SET out_deg = ?1, in_deg = ?2, obs_count = obs_count + ?3, updated_us = ?4 WHERE id = ?5",
1359            params![out_add, in_add, obs_count, now_us(), tgt_id],
1360        )
1361        .map_err(sqlite_err)?;
1362
1363        // Delete source entity.
1364        conn.execute(
1365            "INSERT INTO name_fts(name_fts, rowid, name) VALUES ('delete', ?1, '')",
1366            params![src_id],
1367        )
1368        .map_err(sqlite_err)?;
1369        conn.execute("DELETE FROM entity WHERE id = ?1", params![src_id])
1370            .map_err(sqlite_err)?;
1371
1372        inc_graph_stat(&conn, "entities", -1)?;
1373        self.meta_remove(source);
1374
1375        // Reload target into cache.
1376        if let Ok(meta) = conn.query_row(
1377            "SELECT id, type_id, obs_count, out_deg, in_deg FROM entity WHERE id = ?1",
1378            params![tgt_id],
1379            |row| {
1380                Ok(EntityMeta {
1381                    id: row.get(0)?,
1382                    type_id: row.get(1)?,
1383                    obs_count: row.get(2)?,
1384                    out_deg: row.get(3)?,
1385                    in_deg: row.get(4)?,
1386                })
1387            },
1388        ) {
1389            self.meta_set(target, meta);
1390        }
1391
1392        let (name, etype): (String, String) = conn
1393            .query_row(
1394                "SELECT e.name, t.name FROM entity e JOIN type_dict t ON t.id = e.type_id WHERE e.id = ?1",
1395                params![tgt_id],
1396                |row| Ok((row.get(0)?, row.get(1)?)),
1397            )
1398            .map_err(sqlite_err)?;
1399        let observations = load_observations_opt(&conn, tgt_id);
1400
1401        Ok(Entity {
1402            name,
1403            entity_type: etype,
1404            observations,
1405        })
1406    }
1407
1408    pub fn search_nodes_filtered(
1409        &self,
1410        query: &str,
1411        filter_type: Option<&str>,
1412        offset: usize,
1413        limit: usize,
1414    ) -> Vec<Entity> {
1415        if query.is_empty() {
1416            return Vec::new();
1417        }
1418        let conn = self.readers.get();
1419
1420        // Single pass: collect IDs from name_fts then obs_fts, deduping with a set.
1421        let mut entity_ids: Vec<i64> = Vec::new();
1422        let mut seen: HashSet<i64> = HashSet::new();
1423
1424        if let Ok(mut stmt) = conn.prepare(
1425            "SELECT rowid FROM name_fts WHERE name_fts MATCH ?1 ORDER BY rank LIMIT ?2",
1426        ) {
1427            let limit_i64 = (limit + offset) as i64;
1428            if let Ok(rows) = stmt.query_map(params![query, limit_i64], |row| {
1429                row.get::<_, i64>(0)
1430            }) {
1431                for row in rows.flatten() {
1432                    if seen.insert(row) {
1433                        entity_ids.push(row);
1434                    }
1435                }
1436            }
1437        }
1438
1439        if let Ok(mut stmt) = conn.prepare(
1440            "SELECT entity_id FROM obs_fts JOIN observation ON obs_fts.rowid = observation.id
1441             WHERE obs_fts MATCH ?1
1442             GROUP BY entity_id
1443             LIMIT ?2",
1444        ) {
1445            let limit_i64 = (limit + offset) as i64;
1446            if let Ok(rows) = stmt.query_map(params![query, limit_i64], |row| {
1447                row.get::<_, i64>(0)
1448            }) {
1449                for row in rows.flatten() {
1450                    if seen.insert(row) {
1451                        entity_ids.push(row);
1452                    }
1453                }
1454            }
1455        }
1456
1457        // Apply filter_type, offset, limit.
1458        let mut results = Vec::new();
1459        let mut count: usize = 0;
1460        for eid in entity_ids {
1461            if let Ok(entity) = entity_by_id(&conn, eid) {
1462                if let Some(ft) = filter_type
1463                    && !ft.is_empty() && entity.entity_type != ft {
1464                        continue;
1465                    }
1466                if count < offset {
1467                    count += 1;
1468                    continue;
1469                }
1470                if results.len() >= limit {
1471                    break;
1472                }
1473                results.push(entity);
1474                count += 1;
1475            }
1476        }
1477
1478        results
1479    }
1480
1481    pub fn read_graph_filtered(
1482        &self,
1483        filter_type: Option<&str>,
1484        offset: usize,
1485        limit: usize,
1486    ) -> Result<String> {
1487        let conn = self.readers.get();
1488
1489        let limit_sql: i64 = if limit == usize::MAX {
1490            -1
1491        } else {
1492            limit.min(i64::MAX as usize) as i64
1493        };
1494        let offset_sql: i64 = offset as i64;
1495
1496        // Resolve the requested page of entity ids first. Relations are then
1497        // scoped to edges whose *both* endpoints fall inside this page, which
1498        // keeps the response self-consistent (no dangling references to
1499        // entities that were paged out) and bounds the relation payload by the
1500        // page size instead of dumping every relation in the graph.
1501        let filter = filter_type.filter(|ft| !ft.is_empty());
1502        let ids: Vec<i64> = if let Some(ft) = filter {
1503            let mut stmt = conn
1504                .prepare_cached(
1505                    "SELECT e.id FROM entity e
1506                     WHERE e.type_id = (SELECT id FROM type_dict WHERE kind = 0 AND name = ?1)
1507                       AND e.flags = 0
1508                     ORDER BY e.id LIMIT ?2 OFFSET ?3",
1509                )
1510                .map_err(sqlite_err)?;
1511            stmt.query_map(params![ft, limit_sql, offset_sql], |r| r.get::<_, i64>(0))
1512                .map_err(sqlite_err)?
1513                .filter_map(|r| r.ok())
1514                .collect()
1515        } else {
1516            let mut stmt = conn
1517                .prepare_cached(
1518                    "SELECT e.id FROM entity e WHERE e.flags = 0
1519                     ORDER BY e.id LIMIT ?1 OFFSET ?2",
1520                )
1521                .map_err(sqlite_err)?;
1522            stmt.query_map(params![limit_sql, offset_sql], |r| r.get::<_, i64>(0))
1523                .map_err(sqlite_err)?
1524                .filter_map(|r| r.ok())
1525                .collect()
1526        };
1527
1528        if ids.is_empty() {
1529            return Ok(r#"{"entities":[],"relations":[]}"#.to_string());
1530        }
1531
1532        let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1533
1534        let entities_json: String = {
1535            let sql = format!(
1536                "SELECT COALESCE(json_group_array(json_object(
1537                    'name', e.name,
1538                    'entityType', t.name,
1539                    'observations', COALESCE((
1540                        SELECT json_group_array(o.body ORDER BY o.idx)
1541                        FROM observation o WHERE o.entity_id = e.id
1542                    ), json('[]'))
1543                ) ORDER BY e.id), json('[]'))
1544                FROM entity e
1545                JOIN type_dict t ON t.id = e.type_id
1546                WHERE e.id IN ({placeholders}) AND e.flags = 0"
1547            );
1548            conn.query_row(&sql, rusqlite::params_from_iter(&ids), |row| {
1549                row.get::<_, String>(0)
1550            })
1551            .map_err(sqlite_err)?
1552        };
1553
1554        let relations_json: String = {
1555            let sql = format!(
1556                "SELECT COALESCE(json_group_array(json_object(
1557                    'from', e1.name,
1558                    'to', e2.name,
1559                    'relationType', t.name
1560                )), json('[]'))
1561                FROM relation r
1562                JOIN entity e1 ON e1.id = r.from_id
1563                JOIN entity e2 ON e2.id = r.to_id
1564                JOIN type_dict t ON t.id = r.type_id
1565                WHERE r.from_id IN ({placeholders}) AND r.to_id IN ({placeholders})
1566                  AND e1.flags = 0 AND e2.flags = 0"
1567            );
1568            let all_params: Vec<&dyn ToSql> = ids
1569                .iter()
1570                .map(|id| id as &dyn ToSql)
1571                .chain(ids.iter().map(|id| id as &dyn ToSql))
1572                .collect();
1573            conn.query_row(&sql, all_params.as_slice(), |row| row.get::<_, String>(0))
1574                .map_err(sqlite_err)?
1575        };
1576
1577        let mut out = String::with_capacity(32 + entities_json.len() + relations_json.len());
1578        out.push_str("{\"entities\":");
1579        out.push_str(&entities_json);
1580        out.push_str(",\"relations\":");
1581        out.push_str(&relations_json);
1582        out.push('}');
1583        Ok(out)
1584    }
1585
1586    pub fn open_nodes(&self, names: &[String]) -> String {
1587        let conn = self.readers.get();
1588        let mut entity_ids: Vec<i64> = Vec::new();
1589
1590        for name in names {
1591            let h = name_hash(name);
1592            if let Ok(Some(id)) = conn
1593                .query_row(
1594                    "SELECT id FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
1595                    params![h, name],
1596                    |row| row.get::<_, i64>(0),
1597                )
1598                .map(Some)
1599                .or_else(|e| if is_not_found(&e) { Ok(None) } else { Err(sqlite_err(e)) })
1600            {
1601                entity_ids.push(id);
1602            }
1603        }
1604
1605        if entity_ids.is_empty() {
1606            return r#"{"entities":[],"relations":[]}"#.to_string();
1607        }
1608
1609        let placeholders: Vec<String> = entity_ids.iter().map(|_| "?".to_string()).collect();
1610        let ids_str = placeholders.join(",");
1611
1612        let entities_json: String = {
1613            let sql = format!(
1614                "SELECT COALESCE(json_group_array(json_object(
1615                    'name', e.name,
1616                    'entityType', t.name,
1617                    'observations', COALESCE((
1618                        SELECT json_group_array(o.body ORDER BY o.idx)
1619                        FROM observation o WHERE o.entity_id = e.id
1620                    ), json('[]'))
1621                ) ORDER BY e.id), json('[]'))
1622                FROM entity e
1623                JOIN type_dict t ON t.id = e.type_id
1624                WHERE e.id IN ({ids_str}) AND e.flags = 0"
1625            );
1626            conn.query_row(&sql, rusqlite::params_from_iter(&entity_ids), |row| {
1627                row.get::<_, String>(0)
1628            })
1629            .unwrap_or_else(|_| "[]".to_string())
1630        };
1631
1632        let relations_json: String = {
1633            let sql = format!(
1634                "SELECT COALESCE(json_group_array(json_object(
1635                    'from', e1.name,
1636                    'to', e2.name,
1637                    'relationType', t.name
1638                )), json('[]'))
1639                FROM relation r
1640                JOIN entity e1 ON e1.id = r.from_id
1641                JOIN entity e2 ON e2.id = r.to_id
1642                JOIN type_dict t ON t.id = r.type_id
1643                WHERE (r.from_id IN ({ids_str}) OR r.to_id IN ({ids_str}))
1644                  AND e1.flags = 0 AND e2.flags = 0"
1645            );
1646            let all_params: Vec<&dyn rusqlite::types::ToSql> = entity_ids
1647                .iter()
1648                .map(|id| id as &dyn rusqlite::types::ToSql)
1649                .chain(entity_ids.iter().map(|id| id as &dyn rusqlite::types::ToSql))
1650                .collect();
1651            let mut stmt = conn.prepare(&sql).unwrap();
1652            stmt.query_row(all_params.as_slice(), |row| row.get::<_, String>(0))
1653                .unwrap_or_else(|_| "[]".to_string())
1654        };
1655
1656        let mut out = String::with_capacity(32 + entities_json.len() + relations_json.len());
1657        out.push_str("{\"entities\":");
1658        out.push_str(&entities_json);
1659        out.push_str(",\"relations\":");
1660        out.push_str(&relations_json);
1661        out.push('}');
1662        out
1663    }
1664
1665    pub fn entities_exist(&self, names: &[String]) -> Result<Vec<bool>> {
1666        let conn = self.readers.get();
1667        let mut results = Vec::with_capacity(names.len());
1668        for name in names {
1669            let h = name_hash(name);
1670            let exists: bool = conn
1671                .query_row(
1672                    "SELECT 1 FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
1673                    params![h, name],
1674                    |_| Ok(()),
1675                )
1676                .is_ok();
1677            results.push(exists);
1678        }
1679        Ok(results)
1680    }
1681
1682    pub fn degree(&self, name: &str, direction: Direction) -> Result<usize> {
1683        let conn = self.readers.get();
1684        let (_, _, out_d, in_d) = match self.get_entity_id(&conn, name)? {
1685            Some(v) => v,
1686            None => {
1687                return Err(MCSError::InvalidParams(format!(
1688                    "Entity '{name}' not found"
1689                )))
1690            }
1691        };
1692        Ok(match direction {
1693            Direction::Outgoing => out_d as usize,
1694            Direction::Incoming => in_d as usize,
1695            Direction::Both => (out_d + in_d) as usize,
1696        })
1697    }
1698
1699    pub fn get_entity_count(&self) -> Result<usize> {
1700        let conn = self.readers.get();
1701        read_graph_stat(&conn, "entities")
1702            .map(|v| v as usize)
1703            .map_err(|_| MCSError::MemoryError("Failed to read entity count".into()))
1704    }
1705
1706    pub fn get_relation_count(&self) -> Result<usize> {
1707        let conn = self.readers.get();
1708        read_graph_stat(&conn, "relations")
1709            .map(|v| v as usize)
1710            .map_err(|_| MCSError::MemoryError("Failed to read relation count".into()))
1711    }
1712
1713    pub fn search_relations(
1714        &self,
1715        from: Option<&str>,
1716        to: Option<&str>,
1717        rtype: Option<&str>,
1718    ) -> Vec<Relation> {
1719        let conn = self.readers.get();
1720        let mut results = Vec::new();
1721
1722        // A filter that is supplied but resolves to nothing uses the sentinel
1723        // id -1 (which matches no row), so the query returns empty rather than
1724        // silently dropping the filter and matching every relation. The lookups
1725        // are read-only — `get_type_id` would *insert* a phantom type, which is
1726        // both wrong and impossible on a `query_only` reader connection.
1727        let from_id = from
1728            .filter(|f| !f.is_empty())
1729            .map(|f| entity_name_lookup(&conn, f).ok().flatten().unwrap_or(-1));
1730        let to_id = to
1731            .filter(|t| !t.is_empty())
1732            .map(|t| entity_name_lookup(&conn, t).ok().flatten().unwrap_or(-1));
1733        let type_id = rtype
1734            .filter(|rt| !rt.is_empty())
1735            .map(|rt| lookup_type_id(&conn, rt, 1).unwrap_or(-1));
1736
1737        match (from_id, to_id, type_id) {
1738            (Some(fid), Some(tid), Some(tpid)) => {
1739                if let Ok(mut stmt) = conn.prepare_cached(
1740                    "SELECT e1.name, e2.name, t.name
1741                     FROM relation r
1742                     JOIN entity e1 ON e1.id = r.from_id
1743                     JOIN entity e2 ON e2.id = r.to_id
1744                     JOIN type_dict t ON t.id = r.type_id
1745                     WHERE r.from_id = ?1 AND r.to_id = ?2 AND r.type_id = ?3
1746                       AND e1.flags = 0 AND e2.flags = 0
1747                     ORDER BY r.from_id, r.to_id"
1748                )
1749                    && let Ok(rows) = stmt.query_map(params![fid, tid, tpid], |row| {
1750                        Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1751                    }) {
1752                        for row in rows.flatten() { results.push(row); }
1753                    }
1754            }
1755            (Some(fid), Some(tid), None) => {
1756                if let Ok(mut stmt) = conn.prepare_cached(
1757                    "SELECT e1.name, e2.name, t.name
1758                     FROM relation r
1759                     JOIN entity e1 ON e1.id = r.from_id
1760                     JOIN entity e2 ON e2.id = r.to_id
1761                     JOIN type_dict t ON t.id = r.type_id
1762                     WHERE r.from_id = ?1 AND r.to_id = ?2
1763                       AND e1.flags = 0 AND e2.flags = 0
1764                     ORDER BY r.from_id, r.to_id"
1765                )
1766                    && let Ok(rows) = stmt.query_map(params![fid, tid], |row| {
1767                        Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1768                    }) {
1769                        for row in rows.flatten() { results.push(row); }
1770                    }
1771            }
1772            (Some(fid), None, Some(tpid)) => {
1773                if let Ok(mut stmt) = conn.prepare_cached(
1774                    "SELECT e1.name, e2.name, t.name
1775                     FROM relation r
1776                     JOIN entity e1 ON e1.id = r.from_id
1777                     JOIN entity e2 ON e2.id = r.to_id
1778                     JOIN type_dict t ON t.id = r.type_id
1779                     WHERE r.from_id = ?1 AND r.type_id = ?2
1780                       AND e1.flags = 0 AND e2.flags = 0
1781                     ORDER BY r.from_id, r.to_id"
1782                )
1783                    && let Ok(rows) = stmt.query_map(params![fid, tpid], |row| {
1784                        Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1785                    }) {
1786                        for row in rows.flatten() { results.push(row); }
1787                    }
1788            }
1789            (None, Some(tid), Some(tpid)) => {
1790                if let Ok(mut stmt) = conn.prepare_cached(
1791                    "SELECT e1.name, e2.name, t.name
1792                     FROM relation r
1793                     JOIN entity e1 ON e1.id = r.from_id
1794                     JOIN entity e2 ON e2.id = r.to_id
1795                     JOIN type_dict t ON t.id = r.type_id
1796                     WHERE r.to_id = ?1 AND r.type_id = ?2
1797                       AND e1.flags = 0 AND e2.flags = 0
1798                     ORDER BY r.from_id, r.to_id"
1799                )
1800                    && let Ok(rows) = stmt.query_map(params![tid, tpid], |row| {
1801                        Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1802                    }) {
1803                        for row in rows.flatten() { results.push(row); }
1804                    }
1805            }
1806            (Some(fid), None, None) => {
1807                if let Ok(mut stmt) = conn.prepare_cached(
1808                    "SELECT e1.name, e2.name, t.name
1809                     FROM relation r
1810                     JOIN entity e1 ON e1.id = r.from_id
1811                     JOIN entity e2 ON e2.id = r.to_id
1812                     JOIN type_dict t ON t.id = r.type_id
1813                     WHERE r.from_id = ?1
1814                       AND e1.flags = 0 AND e2.flags = 0
1815                     ORDER BY r.from_id, r.to_id"
1816                )
1817                    && let Ok(rows) = stmt.query_map(params![fid], |row| {
1818                        Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1819                    }) {
1820                        for row in rows.flatten() { results.push(row); }
1821                    }
1822            }
1823            (None, Some(tid), None) => {
1824                if let Ok(mut stmt) = conn.prepare_cached(
1825                    "SELECT e1.name, e2.name, t.name
1826                     FROM relation r
1827                     JOIN entity e1 ON e1.id = r.from_id
1828                     JOIN entity e2 ON e2.id = r.to_id
1829                     JOIN type_dict t ON t.id = r.type_id
1830                     WHERE r.to_id = ?1
1831                       AND e1.flags = 0 AND e2.flags = 0
1832                     ORDER BY r.from_id, r.to_id"
1833                )
1834                    && let Ok(rows) = stmt.query_map(params![tid], |row| {
1835                        Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1836                    }) {
1837                        for row in rows.flatten() { results.push(row); }
1838                    }
1839            }
1840            (None, None, Some(tpid)) => {
1841                if let Ok(mut stmt) = conn.prepare_cached(
1842                    "SELECT e1.name, e2.name, t.name
1843                     FROM relation r
1844                     JOIN entity e1 ON e1.id = r.from_id
1845                     JOIN entity e2 ON e2.id = r.to_id
1846                     JOIN type_dict t ON t.id = r.type_id
1847                     WHERE r.type_id = ?1
1848                       AND e1.flags = 0 AND e2.flags = 0
1849                     ORDER BY r.from_id, r.to_id"
1850                )
1851                    && let Ok(rows) = stmt.query_map(params![tpid], |row| {
1852                        Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1853                    }) {
1854                        for row in rows.flatten() { results.push(row); }
1855                    }
1856            }
1857            (None, None, None) => {
1858                if let Ok(mut stmt) = conn.prepare_cached(
1859                    "SELECT e1.name, e2.name, t.name
1860                     FROM relation r
1861                     JOIN entity e1 ON e1.id = r.from_id
1862                     JOIN entity e2 ON e2.id = r.to_id
1863                     JOIN type_dict t ON t.id = r.type_id
1864                     WHERE e1.flags = 0 AND e2.flags = 0
1865                     ORDER BY r.from_id, r.to_id"
1866                )
1867                    && let Ok(rows) = stmt.query_map([], |row| {
1868                        Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1869                    }) {
1870                        for row in rows.flatten() { results.push(row); }
1871                    }
1872            }
1873        }
1874        results
1875    }
1876
1877    pub fn find_path(&self, from: &str, to: &str) -> Result<Option<Vec<String>>> {
1878        let conn = self.readers.get();
1879        let (from_id, _, _, _) = match self.get_entity_id(&conn, from)? {
1880            Some(v) => v,
1881            None => {
1882                return Err(MCSError::InvalidParams(format!(
1883                    "Source entity '{from}' not found"
1884                )))
1885            }
1886        };
1887        let (to_id, _, _, _) = match self.get_entity_id(&conn, to)? {
1888            Some(v) => v,
1889            None => {
1890                return Err(MCSError::InvalidParams(format!(
1891                    "Target entity '{to}' not found"
1892                )))
1893            }
1894        };
1895
1896        if from_id == to_id {
1897            return Ok(Some(vec![from.to_string()]));
1898        }
1899
1900        // BFS with adjacency from relation table.
1901        let mut visited = HashSet::new();
1902        let mut parent: FxHashMap<i64, i64> = FxHashMap::default();
1903        let mut queue = VecDeque::new();
1904        visited.insert(from_id);
1905        queue.push_back(from_id);
1906
1907        while let Some(cur) = queue.pop_front() {
1908            if cur == to_id {
1909                break;
1910            }
1911            // Fetch out-neighbors.
1912            if let Ok(mut stmt) =
1913                conn.prepare_cached("SELECT to_id FROM relation WHERE from_id = ?1")
1914                && let Ok(rows) = stmt.query_map(params![cur], |row| row.get::<_, i64>(0)) {
1915                    for row in rows.flatten() {
1916                        if visited.insert(row) {
1917                            parent.insert(row, cur);
1918                            queue.push_back(row);
1919                        }
1920                    }
1921                }
1922            // Also check in-neighbors (undirected traversal).
1923            if let Ok(mut stmt) =
1924                conn.prepare_cached("SELECT from_id FROM relation WHERE to_id = ?1")
1925                && let Ok(rows) = stmt.query_map(params![cur], |row| row.get::<_, i64>(0)) {
1926                    for row in rows.flatten() {
1927                        if visited.insert(row) {
1928                            parent.insert(row, cur);
1929                            queue.push_back(row);
1930                        }
1931                    }
1932                }
1933        }
1934
1935        if !parent.contains_key(&to_id) && to_id != from_id {
1936            return Ok(None);
1937        }
1938
1939        let mut path = Vec::new();
1940        let mut cur = to_id;
1941        path.push(cur);
1942        while let Some(&p) = parent.get(&cur) {
1943            path.push(p);
1944            cur = p;
1945            if cur == from_id {
1946                break;
1947            }
1948        }
1949        path.reverse();
1950
1951        let mut name_path = Vec::with_capacity(path.len());
1952        for id in path {
1953            if let Ok(name) = conn.query_row(
1954                "SELECT name FROM entity WHERE id = ?1",
1955                params![id],
1956                |row| row.get::<_, String>(0),
1957            ) {
1958                name_path.push(name);
1959            }
1960        }
1961
1962        Ok(Some(name_path))
1963    }
1964
1965    pub fn compact(&self) -> Result<()> {
1966        let conn = self.writer.lock();
1967        conn.execute_batch("PRAGMA incremental_vacuum;").map_err(sqlite_err)?;
1968        Ok(())
1969    }
1970
1971    pub fn neighbors(
1972        &self,
1973        name: &str,
1974        direction: Direction,
1975        rtype: Option<&str>,
1976        depth: u32,
1977    ) -> Result<String> {
1978        self._traverse(name, direction, rtype, depth, true)
1979    }
1980
1981    pub fn extract_subgraph(
1982        &self,
1983        names: &[String],
1984        depth: u32,
1985    ) -> Result<String> {
1986        if names.is_empty() {
1987            return Ok(r#"{"entities":[],"relations":[]}"#.to_string());
1988        }
1989
1990        let conn = self.readers.get();
1991        let mut all_entity_ids: HashSet<i64> = HashSet::new();
1992        let mut frontier: HashSet<i64> = HashSet::new();
1993        let mut all_rel_pairs: HashSet<(i64, i64, i64)> = HashSet::new();
1994
1995        // Resolve seed entities.
1996        for name in names {
1997            let h = name_hash(name);
1998            if let Ok(Some(id)) = conn
1999                .query_row(
2000                    "SELECT id FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
2001                    params![h, name],
2002                    |row| row.get::<_, i64>(0),
2003                )
2004                .map(Some)
2005                .or_else(|e| if is_not_found(&e) { Ok(None) } else { Err(sqlite_err(e)) })
2006            {
2007                all_entity_ids.insert(id);
2008                frontier.insert(id);
2009            }
2010        }
2011
2012        let mut current_depth = 0u32;
2013        while current_depth < depth && !frontier.is_empty() {
2014            let mut next_frontier: HashSet<i64> = HashSet::new();
2015
2016            // Collect relations for current frontier.
2017            for fid in &frontier {
2018                if let Ok(mut stmt) = conn.prepare_cached(
2019                    "SELECT from_id, to_id, type_id FROM relation WHERE from_id = ?1",
2020                )
2021                    && let Ok(rows) =
2022                        stmt.query_map(params![fid], |row| {
2023                            Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?, row.get::<_, i64>(2)?))
2024                        })
2025                    {
2026                        for row in rows.flatten() {
2027                            let (from_id, to_id, type_id) = row;
2028                            all_rel_pairs.insert((from_id, to_id, type_id));
2029                            if all_entity_ids.insert(to_id) {
2030                                next_frontier.insert(to_id);
2031                            }
2032                        }
2033                    }
2034                if let Ok(mut stmt) = conn.prepare_cached(
2035                    "SELECT from_id, to_id, type_id FROM relation WHERE to_id = ?1",
2036                )
2037                    && let Ok(rows) =
2038                        stmt.query_map(params![fid], |row| {
2039                            Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?, row.get::<_, i64>(2)?))
2040                        })
2041                    {
2042                        for row in rows.flatten() {
2043                            let (from_id, to_id, type_id) = row;
2044                            all_rel_pairs.insert((from_id, to_id, type_id));
2045                            if all_entity_ids.insert(from_id) {
2046                                next_frontier.insert(from_id);
2047                            }
2048                        }
2049                    }
2050            }
2051            frontier = next_frontier;
2052            current_depth += 1;
2053        }
2054
2055        let entities_json: String = {
2056            if all_entity_ids.is_empty() {
2057                "[]".to_string()
2058            } else {
2059                let ids: Vec<i64> = all_entity_ids.iter().copied().collect();
2060                let placeholders: Vec<String> = ids.iter().map(|_| "?".to_string()).collect();
2061                let sql = format!(
2062                    "SELECT COALESCE(json_group_array(json_object(
2063                        'name', e.name,
2064                        'entityType', t.name,
2065                        'observations', COALESCE((
2066                            SELECT json_group_array(o.body ORDER BY o.idx)
2067                            FROM observation o WHERE o.entity_id = e.id
2068                        ), json('[]'))
2069                    ) ORDER BY e.id), json('[]'))
2070                    FROM entity e
2071                    JOIN type_dict t ON t.id = e.type_id
2072                    WHERE e.id IN ({}) AND e.flags = 0",
2073                    placeholders.join(",")
2074                );
2075                conn.query_row(&sql, rusqlite::params_from_iter(&ids), |row| {
2076                    row.get::<_, String>(0)
2077                })
2078                .unwrap_or_else(|_| "[]".to_string())
2079            }
2080        };
2081
2082        let relations_json: String = {
2083            if all_rel_pairs.is_empty() {
2084                "[]".to_string()
2085            } else {
2086                let vals: Vec<String> = all_rel_pairs.iter().map(|_| "(?, ?, ?)".to_string()).collect();
2087                let sql = format!(
2088                    "WITH r(from_id, to_id, type_id) AS (VALUES {})
2089                    SELECT COALESCE(json_group_array(json_object(
2090                        'from', e1.name,
2091                        'to', e2.name,
2092                        'relationType', t.name
2093                    )), json('[]'))
2094                    FROM r
2095                    JOIN entity e1 ON e1.id = r.from_id
2096                    JOIN entity e2 ON e2.id = r.to_id
2097                    JOIN type_dict t ON t.id = r.type_id
2098                    WHERE e1.flags = 0 AND e2.flags = 0",
2099                    vals.join(", ")
2100                );
2101                let params: Vec<&dyn ToSql> = all_rel_pairs.iter()
2102                    .flat_map(|(f, t, tp)| {
2103                        vec![f as &dyn ToSql, t as &dyn ToSql, tp as &dyn ToSql]
2104                    })
2105                    .collect();
2106                let mut stmt = conn.prepare(&sql).map_err(sqlite_err)?;
2107                stmt.query_row(params.as_slice(), |row| row.get::<_, String>(0))
2108                    .unwrap_or_else(|_| "[]".to_string())
2109            }
2110        };
2111
2112        let mut out = String::with_capacity(32 + entities_json.len() + relations_json.len());
2113        out.push_str("{\"entities\":");
2114        out.push_str(&entities_json);
2115        out.push_str(",\"relations\":");
2116        out.push_str(&relations_json);
2117        out.push('}');
2118        Ok(out)
2119    }
2120
2121    pub fn describe_entity(&self, name: &str) -> Result<Entity> {
2122        self.get_entity(name)?
2123            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))
2124    }
2125
2126    pub fn entity_type_counts(&self) -> Vec<(String, usize)> {
2127        let conn = self.readers.get();
2128        select_all_types(&conn, 0).unwrap_or_default()
2129    }
2130
2131    pub fn relation_type_counts(&self) -> Vec<(String, usize)> {
2132        let conn = self.readers.get();
2133        select_all_types(&conn, 1).unwrap_or_default()
2134    }
2135
2136    pub fn batch_get_entities(&self, names: &[String]) -> Vec<Option<Entity>> {
2137        names
2138            .iter()
2139            .map(|n| self.get_entity(n).unwrap_or(None))
2140            .collect()
2141    }
2142
2143    pub fn find_all_paths(
2144        &self,
2145        from: &str,
2146        to: &str,
2147        max_depth: usize,
2148        max_paths: usize,
2149    ) -> Result<Vec<Vec<String>>> {
2150        let conn = self.readers.get();
2151        let (from_id, _, _, _) = match self.get_entity_id(&conn, from)? {
2152            Some(v) => v,
2153            None => {
2154                return Err(MCSError::InvalidParams(format!(
2155                    "Source entity '{from}' not found"
2156                )))
2157            }
2158        };
2159        let (to_id, _, _, _) = match self.get_entity_id(&conn, to)? {
2160            Some(v) => v,
2161            None => {
2162                return Err(MCSError::InvalidParams(format!(
2163                    "Target entity '{to}' not found"
2164                )))
2165            }
2166        };
2167
2168        if from_id == to_id {
2169            return Ok(vec![vec![from.to_string()]]);
2170        }
2171
2172        // BFS enumerating all paths up to max_depth.
2173        let mut all_paths: Vec<Vec<i64>> = Vec::new();
2174        let mut queue: VecDeque<(i64, Vec<i64>)> = VecDeque::new();
2175        queue.push_back((from_id, vec![from_id]));
2176
2177        while let Some((cur, path)) = queue.pop_front() {
2178            if all_paths.len() >= max_paths {
2179                break;
2180            }
2181            if path.len() > max_depth {
2182                continue;
2183            }
2184
2185            // Out-neighbors.
2186            if let Ok(mut stmt) =
2187                conn.prepare_cached("SELECT to_id FROM relation WHERE from_id = ?1")
2188                && let Ok(rows) = stmt.query_map(params![cur], |row| row.get::<_, i64>(0)) {
2189                    for next_id in rows.flatten() {
2190                        if next_id == to_id {
2191                            let mut full_path = path.clone();
2192                            full_path.push(next_id);
2193                            all_paths.push(full_path);
2194                            if all_paths.len() >= max_paths {
2195                                break;
2196                            }
2197                        } else if !path.contains(&next_id) && path.len() < max_depth {
2198                            let mut new_path = path.clone();
2199                            new_path.push(next_id);
2200                            queue.push_back((next_id, new_path));
2201                        }
2202                    }
2203                }
2204
2205            // In-neighbors (undirected).
2206            if let Ok(mut stmt) =
2207                conn.prepare_cached("SELECT from_id FROM relation WHERE to_id = ?1")
2208                && let Ok(rows) = stmt.query_map(params![cur], |row| row.get::<_, i64>(0)) {
2209                    for next_id in rows.flatten() {
2210                        if next_id == to_id {
2211                            let mut full_path = path.clone();
2212                            full_path.push(next_id);
2213                            all_paths.push(full_path);
2214                            if all_paths.len() >= max_paths {
2215                                break;
2216                            }
2217                        } else if !path.contains(&next_id) && path.len() < max_depth {
2218                            let mut new_path = path.clone();
2219                            new_path.push(next_id);
2220                            queue.push_back((next_id, new_path));
2221                        }
2222                    }
2223                }
2224        }
2225
2226        // Convert ids to names.
2227        let mut named_paths: Vec<Vec<String>> = Vec::new();
2228        for path_ids in all_paths {
2229            let mut named = Vec::with_capacity(path_ids.len());
2230            for id in path_ids {
2231                if let Ok(name) = conn.query_row(
2232                    "SELECT name FROM entity WHERE id = ?1",
2233                    params![id],
2234                    |row| row.get::<_, String>(0),
2235                ) {
2236                    named.push(name);
2237                }
2238            }
2239            named_paths.push(named);
2240        }
2241
2242        Ok(named_paths)
2243    }
2244
2245    /// Export the whole graph as a JSON string. `max_rows` caps both the entity
2246    /// and relation arrays so a pathologically large graph cannot be coerced
2247    /// into an unbounded in-memory string (DoS guard); callers pass a generous
2248    /// constant. A negative value means "no limit".
2249    pub fn export(&self, _format: &str, max_rows: i64) -> Result<String> {
2250        let conn = self.readers.get();
2251        // Only JSON is supported; the format argument is accepted for forward
2252        // compatibility.
2253        conn.query_row(
2254            "SELECT json_object(
2255                'entities', COALESCE((
2256                    SELECT json_group_array(json_object(
2257                        'name', e.name,
2258                        'entityType', t.name,
2259                        'observations', COALESCE((
2260                            SELECT json_group_array(o.body ORDER BY o.idx)
2261                            FROM observation o WHERE o.entity_id = e.id
2262                        ), json('[]'))
2263                    ) ORDER BY e.id)
2264                    FROM (
2265                        SELECT id, name, type_id FROM entity
2266                        WHERE flags = 0 ORDER BY id LIMIT ?1
2267                    ) e
2268                    JOIN type_dict t ON t.id = e.type_id
2269                ), json('[]')),
2270                'relations', COALESCE((
2271                    SELECT json_group_array(json_object(
2272                        'from', e1.name,
2273                        'to', e2.name,
2274                        'relationType', t.name
2275                    ))
2276                    FROM (
2277                        SELECT from_id, to_id, type_id FROM relation LIMIT ?1
2278                    ) r
2279                    JOIN entity e1 ON e1.id = r.from_id
2280                    JOIN entity e2 ON e2.id = r.to_id
2281                    JOIN type_dict t ON t.id = r.type_id
2282                    WHERE e1.flags = 0 AND e2.flags = 0
2283                ), json('[]'))
2284            )",
2285            params![max_rows],
2286            |row| row.get::<_, String>(0),
2287        )
2288        .map_err(sqlite_err)
2289    }
2290
2291    pub fn wipe(&self) -> Result<()> {
2292        let conn = self.writer.lock();
2293        // `name_fts`/`obs_fts` are external-content FTS5 tables; the supported
2294        // way to empty them is the special `'delete-all'` command, not a bare
2295        // `DELETE FROM` (which is invalid for external-content tables and can
2296        // leave the index inconsistent). Run it after clearing the content rows.
2297        conn.execute_batch(
2298            "DELETE FROM observation;
2299             DELETE FROM relation;
2300             DELETE FROM entity;
2301             DELETE FROM type_dict;
2302             INSERT INTO name_fts(name_fts) VALUES('delete-all');
2303             INSERT INTO obs_fts(obs_fts) VALUES('delete-all');
2304             UPDATE graph_stat SET value = 0 WHERE key IN ('entities', 'relations', 'observations');
2305             UPDATE graph_stat SET value = 0 WHERE key IN ('entity_seq', 'obs_seq');",
2306        )
2307        .map_err(sqlite_err)?;
2308        self.seq_entity.store(0, Ordering::Relaxed);
2309        self.seq_obs.store(0, Ordering::Relaxed);
2310        self.cache.lock().clear();
2311        Ok(())
2312    }
2313
2314    /// Periodic database maintenance: WAL checkpoint, query planner analysis,
2315    /// and FTS index optimization. Call from a background timer.
2316    pub fn run_maintenance(&self) -> Result<()> {
2317        let conn = self.writer.lock();
2318
2319        conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);")
2320            .map_err(sqlite_err)?;
2321
2322        conn.execute_batch("PRAGMA optimize(0x10000);")
2323            .map_err(sqlite_err)?;
2324
2325        conn.execute_batch(
2326            "INSERT INTO name_fts(name_fts) VALUES('optimize');
2327             INSERT INTO obs_fts(obs_fts) VALUES('optimize');",
2328        )
2329        .map_err(sqlite_err)?;
2330
2331        Ok(())
2332    }
2333
2334    fn _traverse(
2335        &self,
2336        name: &str,
2337        direction: Direction,
2338        rtype: Option<&str>,
2339        depth: u32,
2340        // unused — we always include relations; the caller controls via depth
2341        _include_relations: bool,
2342    ) -> Result<String> {
2343        let conn = self.readers.get();
2344        let (start_id, _, _, _) = match self.get_entity_id(&conn, name)? {
2345            Some(v) => v,
2346            None => {
2347                return Err(MCSError::InvalidParams(format!(
2348                    "Entity '{name}' not found"
2349                )))
2350            }
2351        };
2352
2353        let mut all_ids: HashSet<i64> = HashSet::new();
2354        let mut all_rels: HashSet<(i64, i64, i64)> = HashSet::new();
2355        let mut frontier: HashSet<i64> = HashSet::new();
2356        all_ids.insert(start_id);
2357        frontier.insert(start_id);
2358
2359        // Read-only type resolution. A requested-but-missing type uses the
2360        // sentinel id -1 (matches no edge), so traversal yields just the start
2361        // entity instead of falling back to "no type filter" and walking every
2362        // edge. `get_type_id` is avoided here: it inserts and cannot run on the
2363        // `query_only` reader connection.
2364        let type_filter: Option<i64> = rtype
2365            .filter(|rt| !rt.is_empty())
2366            .map(|rt| lookup_type_id(&conn, rt, 1).unwrap_or(-1));
2367
2368        // Pre-compile all four possible queries outside the loop.
2369        let mut q_out_t = conn.prepare_cached(
2370            "SELECT to_id, type_id FROM relation WHERE from_id = ?1 AND type_id = ?2");
2371        let mut q_out   = conn.prepare_cached(
2372            "SELECT to_id, type_id FROM relation WHERE from_id = ?1");
2373        let mut q_in_t  = conn.prepare_cached(
2374            "SELECT from_id, type_id FROM relation WHERE to_id = ?1 AND type_id = ?2");
2375        let mut q_in    = conn.prepare_cached(
2376            "SELECT from_id, type_id FROM relation WHERE to_id = ?1");
2377
2378        let mut cur_depth = 0u32;
2379        while cur_depth < depth && !frontier.is_empty() {
2380            let mut next_frontier: HashSet<i64> = HashSet::new();
2381
2382            for &fid in &frontier {
2383                if direction == Direction::Outgoing || direction == Direction::Both {
2384                    if let Some(tid) = type_filter {
2385                        if let Ok(ref mut stmt) = q_out_t
2386                            && let Ok(rows) = stmt.query_map(params![fid, tid], |row| {
2387                                Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?))
2388                            }) {
2389                                for row in rows.flatten() {
2390                                    let (to_id, t_id) = row;
2391                                    all_rels.insert((fid, to_id, t_id));
2392                                    if all_ids.insert(to_id) { next_frontier.insert(to_id); }
2393                                }
2394                            }
2395                    } else if let Ok(ref mut stmt) = q_out
2396                        && let Ok(rows) = stmt.query_map(params![fid], |row| {
2397                            Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?))
2398                        }) {
2399                            for row in rows.flatten() {
2400                                let (to_id, t_id) = row;
2401                                all_rels.insert((fid, to_id, t_id));
2402                                if all_ids.insert(to_id) { next_frontier.insert(to_id); }
2403                            }
2404                        }
2405                }
2406
2407                if direction == Direction::Incoming || direction == Direction::Both {
2408                    if let Some(tid) = type_filter {
2409                        if let Ok(ref mut stmt) = q_in_t
2410                            && let Ok(rows) = stmt.query_map(params![fid, tid], |row| {
2411                                Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?))
2412                            }) {
2413                                for row in rows.flatten() {
2414                                    let (from_id, t_id) = row;
2415                                    all_rels.insert((from_id, fid, t_id));
2416                                    if all_ids.insert(from_id) { next_frontier.insert(from_id); }
2417                                }
2418                            }
2419                    } else if let Ok(ref mut stmt) = q_in
2420                        && let Ok(rows) = stmt.query_map(params![fid], |row| {
2421                            Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?))
2422                        }) {
2423                            for row in rows.flatten() {
2424                                let (from_id, t_id) = row;
2425                                all_rels.insert((from_id, fid, t_id));
2426                                if all_ids.insert(from_id) { next_frontier.insert(from_id); }
2427                            }
2428                        }
2429                }
2430            }
2431
2432            frontier = next_frontier;
2433            cur_depth += 1;
2434        }
2435
2436        let entities_json: String = {
2437            if all_ids.is_empty() {
2438                "[]".to_string()
2439            } else {
2440                let ids: Vec<i64> = all_ids.iter().copied().collect();
2441                let placeholders: Vec<String> = ids.iter().map(|_| "?".to_string()).collect();
2442                let sql = format!(
2443                    "SELECT COALESCE(json_group_array(json_object(
2444                        'name', e.name,
2445                        'entityType', t.name,
2446                        'observations', COALESCE((
2447                            SELECT json_group_array(o.body ORDER BY o.idx)
2448                            FROM observation o WHERE o.entity_id = e.id
2449                        ), json('[]'))
2450                    ) ORDER BY e.id), json('[]'))
2451                    FROM entity e
2452                    JOIN type_dict t ON t.id = e.type_id
2453                    WHERE e.id IN ({}) AND e.flags = 0",
2454                    placeholders.join(",")
2455                );
2456                conn.query_row(&sql, rusqlite::params_from_iter(&ids), |row| {
2457                    row.get::<_, String>(0)
2458                })
2459                .unwrap_or_else(|_| "[]".to_string())
2460            }
2461        };
2462
2463        let relations_json: String = {
2464            if all_rels.is_empty() {
2465                "[]".to_string()
2466            } else {
2467                let vals: Vec<String> = all_rels.iter().map(|_| "(?, ?, ?)".to_string()).collect();
2468                let sql = format!(
2469                    "WITH r(from_id, to_id, type_id) AS (VALUES {})
2470                    SELECT COALESCE(json_group_array(json_object(
2471                        'from', e1.name,
2472                        'to', e2.name,
2473                        'relationType', t.name
2474                    )), json('[]'))
2475                    FROM r
2476                    JOIN entity e1 ON e1.id = r.from_id
2477                    JOIN entity e2 ON e2.id = r.to_id
2478                    JOIN type_dict t ON t.id = r.type_id
2479                    WHERE e1.flags = 0 AND e2.flags = 0",
2480                    vals.join(", ")
2481                );
2482                let params: Vec<&dyn ToSql> = all_rels.iter()
2483                    .flat_map(|(f, t, tp)| {
2484                        vec![f as &dyn ToSql, t as &dyn ToSql, tp as &dyn ToSql]
2485                    })
2486                    .collect();
2487                let mut stmt = conn.prepare(&sql).map_err(sqlite_err)?;
2488                stmt.query_row(params.as_slice(), |row| row.get::<_, String>(0))
2489                    .unwrap_or_else(|_| "[]".to_string())
2490            }
2491        };
2492
2493        let mut out = String::with_capacity(32 + entities_json.len() + relations_json.len());
2494        out.push_str("{\"entities\":");
2495        out.push_str(&entities_json);
2496        out.push_str(",\"relations\":");
2497        out.push_str(&relations_json);
2498        out.push('}');
2499        Ok(out)
2500    }
2501}
2502
2503// ── Tests ────────────────────────────────────────────────────────────────
2504
2505#[cfg(test)]
2506mod tests {
2507    use super::*;
2508    use serde_json::Value;
2509    use std::ops::Deref;
2510    use std::path::PathBuf;
2511
2512    struct TestKg(GraphHandle, PathBuf);
2513
2514    impl Deref for TestKg {
2515        type Target = GraphHandle;
2516        fn deref(&self) -> &GraphHandle {
2517            &self.0
2518        }
2519    }
2520
2521    impl Drop for TestKg {
2522        fn drop(&mut self) {
2523            cleanup_db(&self.1);
2524        }
2525    }
2526
2527    fn cleanup_db(path: &std::path::Path) {
2528        let _ = std::fs::remove_file(path);
2529        let _ = std::fs::remove_file(path.with_extension("db-wal"));
2530        let _ = std::fs::remove_file(path.with_extension("db-shm"));
2531    }
2532
2533    fn new_kg() -> TestKg {
2534        use std::sync::atomic::AtomicU64;
2535        use std::sync::atomic::Ordering;
2536        static COUNTER: AtomicU64 = AtomicU64::new(0);
2537        let n = COUNTER.fetch_add(1, Ordering::SeqCst);
2538        let dir = std::env::temp_dir();
2539        let path = dir.join(format!("kg_test_{}_{}.db", std::process::id(), n));
2540        cleanup_db(&path);
2541        let kg = GraphHandle::new(&path, Durability::Async, 268435456, NonZeroUsize::new(10000).unwrap(), 4).expect("create KG");
2542        TestKg(kg, path)
2543    }
2544
2545    #[test]
2546    fn test_create_and_get_entity() {
2547        let kg = new_kg();
2548        let entities = vec![Entity {
2549            name: "test".into(),
2550            entity_type: "person".into(),
2551            observations: vec!["obs1".into(), "obs2".into()],
2552        }];
2553        let created = kg.create_entities(&entities).unwrap();
2554        assert_eq!(created.len(), 1);
2555
2556        let got = kg.get_entity("test").unwrap().unwrap();
2557        assert_eq!(got.name, "test");
2558        assert_eq!(got.entity_type, "person");
2559        assert_eq!(got.observations, vec!["obs1", "obs2"]);
2560    }
2561
2562    #[test]
2563    fn test_get_nonexistent() {
2564        let kg = new_kg();
2565        assert!(kg.get_entity("nonexistent").unwrap().is_none());
2566    }
2567
2568    #[test]
2569    fn test_delete_entity() {
2570        let kg = new_kg();
2571        kg.create_entities(&[Entity {
2572            name: "del".into(),
2573            entity_type: "t".into(),
2574            observations: vec![],
2575        }])
2576        .unwrap();
2577        assert!(kg.get_entity("del").unwrap().is_some());
2578        kg.delete_entities(&["del".to_string()]).unwrap();
2579        assert!(kg.get_entity("del").unwrap().is_none());
2580    }
2581
2582    #[test]
2583    fn test_add_and_delete_observations() {
2584        let kg = new_kg();
2585        kg.create_entities(&[Entity {
2586            name: "obs_test".into(),
2587            entity_type: "t".into(),
2588            observations: vec!["a".into()],
2589        }])
2590        .unwrap();
2591
2592        let added = kg.add_observations("obs_test", &["b".into(), "c".into()]).unwrap();
2593        assert_eq!(added.len(), 2);
2594
2595        let ent = kg.get_entity("obs_test").unwrap().unwrap();
2596        assert!(ent.observations.contains(&"b".into()));
2597        assert!(ent.observations.contains(&"c".into()));
2598
2599        kg.delete_observations("obs_test", &["b".into()]).unwrap();
2600        let ent = kg.get_entity("obs_test").unwrap().unwrap();
2601        assert!(!ent.observations.contains(&"b".into()));
2602        assert!(ent.observations.contains(&"c".into()));
2603        assert!(ent.observations.contains(&"a".into()));
2604    }
2605
2606    #[test]
2607    fn test_create_relations() {
2608        let kg = new_kg();
2609        kg.create_entities(&[
2610            Entity {
2611                name: "A".into(),
2612                entity_type: "node".into(),
2613                observations: vec![],
2614            },
2615            Entity {
2616                name: "B".into(),
2617                entity_type: "node".into(),
2618                observations: vec![],
2619            },
2620        ])
2621        .unwrap();
2622
2623        let rels = kg
2624            .create_relations(&[Relation {
2625                from: "A".into(),
2626                to: "B".into(),
2627                relation_type: "edge".into(),
2628            }])
2629            .unwrap();
2630        assert_eq!(rels.len(), 1);
2631
2632        assert_eq!(kg.get_entity_count().unwrap(), 2);
2633        assert_eq!(kg.get_relation_count().unwrap(), 1);
2634    }
2635
2636    #[test]
2637    fn test_search_nodes() {
2638        let kg = new_kg();
2639        kg.create_entities(&[Entity {
2640            name: "Einstein".into(),
2641            entity_type: "scientist".into(),
2642            observations: vec!["physics".into(), "relativity".into()],
2643        }])
2644        .unwrap();
2645
2646        let results = kg.search_nodes_filtered("physics", None, 0, 10);
2647        assert_eq!(results.len(), 1);
2648        assert_eq!(results[0].name, "Einstein");
2649
2650        let results = kg.search_nodes_filtered("physics", Some("scientist"), 0, 10);
2651        assert_eq!(results.len(), 1);
2652
2653        let results = kg.search_nodes_filtered("physics", Some("nonexistent"), 0, 10);
2654        assert_eq!(results.len(), 0);
2655    }
2656
2657    #[test]
2658    fn test_find_path() {
2659        let kg = new_kg();
2660        kg.create_entities(&[
2661            Entity { name: "A".into(), entity_type: "n".into(), observations: vec![] },
2662            Entity { name: "B".into(), entity_type: "n".into(), observations: vec![] },
2663            Entity { name: "C".into(), entity_type: "n".into(), observations: vec![] },
2664        ]).unwrap();
2665
2666        kg.create_relations(&[
2667            Relation { from: "A".into(), to: "B".into(), relation_type: "e".into() },
2668            Relation { from: "B".into(), to: "C".into(), relation_type: "e".into() },
2669        ]).unwrap();
2670
2671        let path = kg.find_path("A", "C").unwrap().unwrap();
2672        assert_eq!(path, vec!["A", "B", "C"]);
2673    }
2674
2675    #[test]
2676    fn test_degree() {
2677        let kg = new_kg();
2678        kg.create_entities(&[
2679            Entity { name: "A".into(), entity_type: "n".into(), observations: vec![] },
2680            Entity { name: "B".into(), entity_type: "n".into(), observations: vec![] },
2681            Entity { name: "C".into(), entity_type: "n".into(), observations: vec![] },
2682        ]).unwrap();
2683
2684        kg.create_relations(&[
2685            Relation { from: "A".into(), to: "B".into(), relation_type: "e".into() },
2686            Relation { from: "A".into(), to: "C".into(), relation_type: "e".into() },
2687        ]).unwrap();
2688
2689        assert_eq!(kg.degree("A", Direction::Outgoing).unwrap(), 2);
2690        assert_eq!(kg.degree("A", Direction::Incoming).unwrap(), 0);
2691        assert_eq!(kg.degree("B", Direction::Incoming).unwrap(), 1);
2692    }
2693
2694    #[test]
2695    fn test_neighbors() {
2696        let kg = new_kg();
2697        kg.create_entities(&[
2698            Entity { name: "A".into(), entity_type: "n".into(), observations: vec![] },
2699            Entity { name: "B".into(), entity_type: "n".into(), observations: vec![] },
2700        ]).unwrap();
2701
2702        kg.create_relations(&[Relation {
2703            from: "A".into(), to: "B".into(), relation_type: "e".into(),
2704        }]).unwrap();
2705
2706        let result = kg.neighbors("A", Direction::Outgoing, None, 1).unwrap();
2707        let v: Value = serde_json::from_str(&result).unwrap();
2708        assert_eq!(v["entities"].as_array().unwrap().len(), 2);
2709        assert_eq!(v["relations"].as_array().unwrap().len(), 1);
2710    }
2711
2712    #[test]
2713    fn test_open_nodes() {
2714        let kg = new_kg();
2715        kg.create_entities(&[
2716            Entity { name: "X".into(), entity_type: "n".into(), observations: vec!["obs_x".into()] },
2717            Entity { name: "Y".into(), entity_type: "n".into(), observations: vec!["obs_y".into()] },
2718        ]).unwrap();
2719
2720        kg.create_relations(&[Relation {
2721            from: "X".into(), to: "Y".into(), relation_type: "e".into(),
2722        }]).unwrap();
2723
2724        let result = kg.open_nodes(&["X".into()]);
2725        let v: Value = serde_json::from_str(&result).unwrap();
2726        assert_eq!(v["entities"].as_array().unwrap().len(), 1);
2727        assert_eq!(v["relations"].as_array().unwrap().len(), 1);
2728    }
2729
2730    #[test]
2731    fn test_entities_exist() {
2732        let kg = new_kg();
2733        kg.create_entities(&[Entity {
2734            name: "exists".into(), entity_type: "t".into(), observations: vec![],
2735        }]).unwrap();
2736
2737        let res = kg.entities_exist(&["exists".into(), "missing".into()]).unwrap();
2738        assert_eq!(res, vec![true, false]);
2739    }
2740
2741    #[test]
2742    fn test_describe_entity() {
2743        let kg = new_kg();
2744        kg.create_entities(&[Entity {
2745            name: "desc".into(), entity_type: "t".into(), observations: vec!["o".into()],
2746        }]).unwrap();
2747
2748        let entity = kg.describe_entity("desc").unwrap();
2749        assert_eq!(entity.name, "desc");
2750    }
2751
2752    #[test]
2753    fn test_entity_type_counts() {
2754        let kg = new_kg();
2755        kg.create_entities(&[
2756            Entity { name: "a".into(), entity_type: "person".into(), observations: vec![] },
2757            Entity { name: "b".into(), entity_type: "person".into(), observations: vec![] },
2758            Entity { name: "c".into(), entity_type: "place".into(), observations: vec![] },
2759        ]).unwrap();
2760
2761        let counts = kg.entity_type_counts();
2762        let map: FxHashMap<_, _> = counts.into_iter().collect();
2763        assert_eq!(map.get("person"), Some(&2));
2764        assert_eq!(map.get("place"), Some(&1));
2765    }
2766
2767    #[test]
2768    fn test_relation_type_counts() {
2769        let kg = new_kg();
2770        kg.create_entities(&[
2771            Entity { name: "a".into(), entity_type: "n".into(), observations: vec![] },
2772            Entity { name: "b".into(), entity_type: "n".into(), observations: vec![] },
2773            Entity { name: "c".into(), entity_type: "n".into(), observations: vec![] },
2774        ]).unwrap();
2775
2776        kg.create_relations(&[
2777            Relation { from: "a".into(), to: "b".into(), relation_type: "knows".into() },
2778            Relation { from: "a".into(), to: "c".into(), relation_type: "knows".into() },
2779        ]).unwrap();
2780
2781        let counts = kg.relation_type_counts();
2782        let map: FxHashMap<_, _> = counts.into_iter().collect();
2783        assert_eq!(map.get("knows"), Some(&2));
2784    }
2785
2786    #[test]
2787    fn test_upsert_entities() {
2788        let kg = new_kg();
2789        kg.create_entities(&[Entity {
2790            name: "u".into(), entity_type: "old".into(), observations: vec!["existing".into()],
2791        }]).unwrap();
2792
2793        // Upsert with new type and additional observation.
2794        kg.upsert_entities(&[Entity {
2795            name: "u".into(), entity_type: "new".into(), observations: vec!["existing".into(), "added".into()],
2796        }]).unwrap();
2797
2798        let ent = kg.get_entity("u").unwrap().unwrap();
2799        assert_eq!(ent.entity_type, "new");
2800        assert!(ent.observations.contains(&"added".into()));
2801        assert!(ent.observations.contains(&"existing".into()));
2802    }
2803
2804    #[test]
2805    fn test_merge_entities() {
2806        let kg = new_kg();
2807        kg.create_entities(&[
2808            Entity { name: "source".into(), entity_type: "t".into(), observations: vec!["src_obs".into()] },
2809            Entity { name: "target".into(), entity_type: "t".into(), observations: vec!["tgt_obs".into()] },
2810        ]).unwrap();
2811
2812        kg.create_relations(&[Relation {
2813            from: "source".into(), to: "target".into(), relation_type: "e".into(),
2814        }]).unwrap();
2815
2816        let merged = kg.merge_entities("source", "target").unwrap();
2817        assert_eq!(merged.name, "target");
2818        assert_eq!(kg.get_entity("source").unwrap().is_none(), true);
2819    }
2820
2821    #[test]
2822    fn test_find_all_paths() {
2823        let kg = new_kg();
2824        kg.create_entities(&[
2825            Entity { name: "A".into(), entity_type: "n".into(), observations: vec![] },
2826            Entity { name: "B".into(), entity_type: "n".into(), observations: vec![] },
2827            Entity { name: "C".into(), entity_type: "n".into(), observations: vec![] },
2828        ]).unwrap();
2829
2830        kg.create_relations(&[
2831            Relation { from: "A".into(), to: "B".into(), relation_type: "e".into() },
2832            Relation { from: "B".into(), to: "C".into(), relation_type: "e".into() },
2833            Relation { from: "A".into(), to: "C".into(), relation_type: "e".into() },
2834        ]).unwrap();
2835
2836        let paths = kg.find_all_paths("A", "C", 5, 10).unwrap();
2837        assert!(paths.len() >= 2);
2838    }
2839
2840    #[test]
2841    fn test_batch_get_entities() {
2842        let kg = new_kg();
2843        kg.create_entities(&[
2844            Entity { name: "a".into(), entity_type: "t".into(), observations: vec![] },
2845            Entity { name: "b".into(), entity_type: "t".into(), observations: vec![] },
2846        ]).unwrap();
2847
2848        let results = kg.batch_get_entities(&["a".into(), "missing".into(), "b".into()]);
2849        assert_eq!(results.len(), 3);
2850        assert!(results[0].is_some());
2851        assert!(results[1].is_none());
2852        assert!(results[2].is_some());
2853    }
2854
2855    #[test]
2856    fn test_export_graph() {
2857        let kg = new_kg();
2858        kg.create_entities(&[Entity {
2859            name: "exp".into(), entity_type: "t".into(), observations: vec!["o".into()],
2860        }]).unwrap();
2861
2862        let exported = kg.export("json", i64::MAX).unwrap();
2863        assert!(exported.contains("exp"));
2864        assert!(exported.contains("o"));
2865    }
2866
2867    #[test]
2868    fn test_graph_stats() {
2869        let kg = new_kg();
2870        assert_eq!(kg.get_entity_count().unwrap(), 0);
2871        assert_eq!(kg.get_relation_count().unwrap(), 0);
2872
2873        kg.create_entities(&[Entity {
2874            name: "s".into(), entity_type: "t".into(), observations: vec![],
2875        }]).unwrap();
2876
2877        assert_eq!(kg.get_entity_count().unwrap(), 1);
2878    }
2879
2880    #[test]
2881    fn test_read_graph_filtered() {
2882        let kg = new_kg();
2883        kg.create_entities(&[
2884            Entity { name: "p1".into(), entity_type: "person".into(), observations: vec![] },
2885            Entity { name: "p2".into(), entity_type: "place".into(), observations: vec![] },
2886        ]).unwrap();
2887
2888        let out = kg.read_graph_filtered(Some("person"), 0, 10).unwrap();
2889        let v: Value = serde_json::from_str(&out).unwrap();
2890        assert_eq!(v["entities"].as_array().unwrap().len(), 1);
2891        assert_eq!(v["entities"][0]["name"], "p1");
2892    }
2893
2894    #[test]
2895    fn test_wipe() {
2896        let kg = new_kg();
2897        kg.create_entities(&[Entity {
2898            name: "w".into(), entity_type: "t".into(), observations: vec!["o".into()],
2899        }]).unwrap();
2900        assert_eq!(kg.get_entity_count().unwrap(), 1);
2901
2902        kg.wipe().unwrap();
2903        assert_eq!(kg.get_entity_count().unwrap(), 0);
2904    }
2905
2906    #[test]
2907    fn test_push_json_str() {
2908        let mut buf = String::new();
2909        push_json_str(&mut buf, "hello");
2910        assert_eq!(buf, "\"hello\"");
2911        let mut buf = String::new();
2912        push_json_str(&mut buf, "he\"llo");
2913        assert_eq!(buf, "\"he\\\"llo\"");
2914    }
2915
2916    // ── create_entities edge cases ────────────────────────────────────
2917
2918    #[test]
2919    fn test_create_entities_empty_input() {
2920        let kg = new_kg();
2921        let created = kg.create_entities(&[]).unwrap();
2922        assert!(created.is_empty());
2923    }
2924
2925    #[test]
2926    fn test_create_entities_skip_empty_name() {
2927        let kg = new_kg();
2928        let created = kg.create_entities(&[Entity {
2929            name: "".into(),
2930            entity_type: "t".into(),
2931            observations: vec![],
2932        }])
2933        .unwrap();
2934        assert!(created.is_empty());
2935        assert_eq!(kg.get_entity_count().unwrap(), 0);
2936    }
2937
2938    #[test]
2939    fn test_create_entities_duplicate_names() {
2940        let kg = new_kg();
2941        let e = Entity {
2942            name: "dup".into(),
2943            entity_type: "t".into(),
2944            observations: vec!["obs".into()],
2945        };
2946        let first = kg.create_entities(&[e.clone()]).unwrap();
2947        assert_eq!(first.len(), 1);
2948        let second = kg.create_entities(&[e.clone()]).unwrap();
2949        assert!(second.is_empty());
2950        assert_eq!(kg.get_entity_count().unwrap(), 1);
2951    }
2952
2953    #[test]
2954    fn test_create_entities_partial_duplicates() {
2955        let kg = new_kg();
2956        let created = kg.create_entities(&[
2957            Entity { name: "a".into(), entity_type: "t".into(), observations: vec![] },
2958            Entity { name: "b".into(), entity_type: "t".into(), observations: vec![] },
2959        ]).unwrap();
2960        assert_eq!(created.len(), 2);
2961
2962        let second = kg.create_entities(&[
2963            Entity { name: "b".into(), entity_type: "t".into(), observations: vec![] },
2964            Entity { name: "c".into(), entity_type: "t".into(), observations: vec![] },
2965        ]).unwrap();
2966        assert_eq!(second.len(), 1); // only c created
2967        assert_eq!(second[0].name, "c");
2968        assert_eq!(kg.get_entity_count().unwrap(), 3);
2969    }
2970
2971    #[test]
2972    fn test_create_entities_mixed_empty_and_valid() {
2973        let kg = new_kg();
2974        let created = kg.create_entities(&[
2975            Entity { name: "".into(), entity_type: "t".into(), observations: vec![] },
2976            Entity { name: "valid".into(), entity_type: "t".into(), observations: vec![] },
2977            Entity { name: "".into(), entity_type: "t".into(), observations: vec![] },
2978        ]).unwrap();
2979        assert_eq!(created.len(), 1);
2980        assert_eq!(created[0].name, "valid");
2981        assert_eq!(kg.get_entity_count().unwrap(), 1);
2982    }
2983
2984    #[test]
2985    fn test_create_entities_same_name_in_batch() {
2986        let kg = new_kg();
2987        let created = kg.create_entities(&[
2988            Entity { name: "dup_in_batch".into(), entity_type: "t".into(), observations: vec![] },
2989            Entity { name: "dup_in_batch".into(), entity_type: "t".into(), observations: vec![] },
2990        ]).unwrap();
2991        assert_eq!(created.len(), 1);
2992        assert_eq!(kg.get_entity_count().unwrap(), 1);
2993    }
2994
2995    // ── create_relations edge cases ───────────────────────────────────
2996
2997    #[test]
2998    fn test_create_relations_empty_input() {
2999        let kg = new_kg();
3000        let rels = kg.create_relations(&[]).unwrap();
3001        assert!(rels.is_empty());
3002    }
3003
3004    #[test]
3005    fn test_create_relations_nonexistent_from() {
3006        let kg = new_kg();
3007        kg.create_entities(&[Entity {
3008            name: "B".into(), entity_type: "t".into(), observations: vec![],
3009        }]).unwrap();
3010
3011        let rels = kg.create_relations(&[Relation {
3012            from: "A".into(), to: "B".into(), relation_type: "e".into(),
3013        }]).unwrap();
3014        assert!(rels.is_empty());
3015        assert_eq!(kg.get_relation_count().unwrap(), 0);
3016    }
3017
3018    #[test]
3019    fn test_create_relations_nonexistent_to() {
3020        let kg = new_kg();
3021        kg.create_entities(&[Entity {
3022            name: "A".into(), entity_type: "t".into(), observations: vec![],
3023        }]).unwrap();
3024
3025        let rels = kg.create_relations(&[Relation {
3026            from: "A".into(), to: "B".into(), relation_type: "e".into(),
3027        }]).unwrap();
3028        assert!(rels.is_empty());
3029        assert_eq!(kg.get_relation_count().unwrap(), 0);
3030    }
3031
3032    #[test]
3033    fn test_create_relations_both_nonexistent() {
3034        let kg = new_kg();
3035        let rels = kg.create_relations(&[Relation {
3036            from: "A".into(), to: "B".into(), relation_type: "e".into(),
3037        }]).unwrap();
3038        assert!(rels.is_empty());
3039    }
3040
3041    #[test]
3042    fn test_create_relations_self_loop() {
3043        let kg = new_kg();
3044        kg.create_entities(&[Entity {
3045            name: "self".into(), entity_type: "t".into(), observations: vec![],
3046        }]).unwrap();
3047
3048        let rels = kg.create_relations(&[Relation {
3049            from: "self".into(), to: "self".into(), relation_type: "loop".into(),
3050        }]).unwrap();
3051        assert_eq!(rels.len(), 1);
3052        assert_eq!(kg.get_relation_count().unwrap(), 1);
3053        assert_eq!(kg.degree("self", Direction::Outgoing).unwrap(), 1);
3054        assert_eq!(kg.degree("self", Direction::Incoming).unwrap(), 1);
3055    }
3056
3057    #[test]
3058    fn test_create_relations_duplicate() {
3059        let kg = new_kg();
3060        kg.create_entities(&[
3061            Entity { name: "A".into(), entity_type: "t".into(), observations: vec![] },
3062            Entity { name: "B".into(), entity_type: "t".into(), observations: vec![] },
3063        ]).unwrap();
3064
3065        let r = Relation {
3066            from: "A".into(), to: "B".into(), relation_type: "e".into(),
3067        };
3068        let first = kg.create_relations(&[r.clone()]).unwrap();
3069        assert_eq!(first.len(), 1);
3070
3071        let second = kg.create_relations(&[r.clone()]).unwrap();
3072        assert!(second.is_empty());
3073        assert_eq!(kg.get_relation_count().unwrap(), 1);
3074    }
3075
3076    #[test]
3077    fn test_create_relations_new_type_auto_created() {
3078        let kg = new_kg();
3079        kg.create_entities(&[
3080            Entity { name: "A".into(), entity_type: "t".into(), observations: vec![] },
3081            Entity { name: "B".into(), entity_type: "t".into(), observations: vec![] },
3082        ]).unwrap();
3083
3084        let rels = kg.create_relations(&[Relation {
3085            from: "A".into(), to: "B".into(), relation_type: "brand_new_type".into(),
3086        }]).unwrap();
3087        assert_eq!(rels.len(), 1);
3088
3089        let counts = kg.relation_type_counts();
3090        let map: FxHashMap<_, _> = counts.into_iter().collect();
3091        assert_eq!(map.get("brand_new_type"), Some(&1));
3092    }
3093
3094    #[test]
3095    fn test_create_relations_degree_updates() {
3096        let kg = new_kg();
3097        kg.create_entities(&[
3098            Entity { name: "A".into(), entity_type: "t".into(), observations: vec![] },
3099            Entity { name: "B".into(), entity_type: "t".into(), observations: vec![] },
3100            Entity { name: "C".into(), entity_type: "t".into(), observations: vec![] },
3101        ]).unwrap();
3102
3103        kg.create_relations(&[
3104            Relation { from: "A".into(), to: "B".into(), relation_type: "e".into() },
3105            Relation { from: "A".into(), to: "C".into(), relation_type: "e".into() },
3106        ]).unwrap();
3107
3108        assert_eq!(kg.degree("A", Direction::Outgoing).unwrap(), 2);
3109        assert_eq!(kg.degree("A", Direction::Incoming).unwrap(), 0);
3110        assert_eq!(kg.degree("B", Direction::Incoming).unwrap(), 1);
3111        assert_eq!(kg.degree("C", Direction::Incoming).unwrap(), 1);
3112        assert_eq!(kg.degree("A", Direction::Both).unwrap(), 2);
3113    }
3114
3115    #[test]
3116    fn test_create_relations_delete_and_recreate() {
3117        let kg = new_kg();
3118        kg.create_entities(&[
3119            Entity { name: "A".into(), entity_type: "t".into(), observations: vec![] },
3120            Entity { name: "B".into(), entity_type: "t".into(), observations: vec![] },
3121        ]).unwrap();
3122
3123        let r = Relation {
3124            from: "A".into(), to: "B".into(), relation_type: "e".into(),
3125        };
3126        kg.create_relations(&[r.clone()]).unwrap();
3127        assert_eq!(kg.get_relation_count().unwrap(), 1);
3128
3129        kg.delete_relations(&[r.clone()]).unwrap();
3130        assert_eq!(kg.get_relation_count().unwrap(), 0);
3131
3132        // Recreate after delete
3133        let re = kg.create_relations(&[r.clone()]).unwrap();
3134        assert_eq!(re.len(), 1);
3135        assert_eq!(kg.get_relation_count().unwrap(), 1);
3136    }
3137
3138    // ── Integration edge cases ────────────────────────────────────────
3139
3140    #[test]
3141    fn test_create_entities_then_relations_then_delete_entity_with_relations() {
3142        let kg = new_kg();
3143        kg.create_entities(&[
3144            Entity { name: "A".into(), entity_type: "t".into(), observations: vec![] },
3145            Entity { name: "B".into(), entity_type: "t".into(), observations: vec![] },
3146        ]).unwrap();
3147        kg.create_relations(&[
3148            Relation { from: "A".into(), to: "B".into(), relation_type: "e".into() },
3149        ]).unwrap();
3150
3151        assert_eq!(kg.get_relation_count().unwrap(), 1);
3152
3153        // Deleting entity A should also delete the relation
3154        kg.delete_entities(&["A".into()]).unwrap();
3155        assert_eq!(kg.get_entity("A").unwrap().is_none(), true);
3156        assert_eq!(kg.get_relation_count().unwrap(), 0);
3157    }
3158
3159    #[test]
3160    fn test_graph_stats_after_entity_with_observations() {
3161        let kg = new_kg();
3162        kg.create_entities(&[Entity {
3163            name: "stat".into(), entity_type: "t".into(),
3164            observations: vec!["o1".into(), "o2".into(), "o3".into()],
3165        }]).unwrap();
3166
3167        let ecount = kg.get_entity_count().unwrap();
3168        // graph_stat for observations is tracked but there's no public getter for it
3169        assert_eq!(ecount, 1);
3170
3171        // delete reverts stats
3172        kg.delete_entities(&["stat".into()]).unwrap();
3173        assert_eq!(kg.get_entity_count().unwrap(), 0);
3174    }
3175
3176    // ── Helpers for the fix-specific suites ────────────────────────────────
3177
3178    fn new_kg_with_pool(read_pool_size: usize) -> TestKg {
3179        use std::sync::atomic::AtomicU64;
3180        static COUNTER: AtomicU64 = AtomicU64::new(1_000_000);
3181        let n = COUNTER.fetch_add(1, Ordering::SeqCst);
3182        let path = std::env::temp_dir().join(format!("kg_pool_{}_{}.db", std::process::id(), n));
3183        cleanup_db(&path);
3184        let kg = GraphHandle::new(
3185            &path,
3186            Durability::Async,
3187            268_435_456,
3188            NonZeroUsize::new(10_000).unwrap(),
3189            read_pool_size,
3190        )
3191        .expect("create KG");
3192        TestKg(kg, path)
3193    }
3194
3195    fn seed_line(kg: &GraphHandle, n: usize) {
3196        let entities: Vec<Entity> = (0..n)
3197            .map(|i| Entity {
3198                name: format!("n{i}"),
3199                entity_type: "node".into(),
3200                observations: vec![format!("obs of n{i}")],
3201            })
3202            .collect();
3203        kg.create_entities(&entities).unwrap();
3204        let rels: Vec<Relation> = (0..n.saturating_sub(1))
3205            .map(|i| Relation {
3206                from: format!("n{i}"),
3207                to: format!("n{}", i + 1),
3208                relation_type: "edge".into(),
3209            })
3210            .collect();
3211        if !rels.is_empty() {
3212            kg.create_relations(&rels).unwrap();
3213        }
3214    }
3215
3216    fn count_relations(graph_json: &str) -> usize {
3217        let v: Value = serde_json::from_str(graph_json).unwrap();
3218        v["relations"].as_array().unwrap().len()
3219    }
3220
3221    fn count_entities(graph_json: &str) -> usize {
3222        let v: Value = serde_json::from_str(graph_json).unwrap();
3223        v["entities"].as_array().unwrap().len()
3224    }
3225
3226    // ── Fix #1: reader pool / concurrency ──────────────────────────────────
3227
3228    #[test]
3229    fn test_pool_size_one_still_works() {
3230        let kg = new_kg_with_pool(1);
3231        seed_line(&kg, 5);
3232        assert_eq!(kg.get_entity_count().unwrap(), 5);
3233        assert!(kg.get_entity("n2").unwrap().is_some());
3234        let g = kg.read_graph_filtered(None, 0, usize::MAX).unwrap();
3235        assert_eq!(count_entities(&g), 5);
3236    }
3237
3238    #[test]
3239    fn test_reads_see_committed_writes() {
3240        // A read on a pool connection must observe a just-committed write made on
3241        // the writer connection (WAL visibility).
3242        let kg = new_kg_with_pool(4);
3243        kg.create_entities(&[Entity {
3244            name: "fresh".into(),
3245            entity_type: "t".into(),
3246            observations: vec!["v".into()],
3247        }])
3248        .unwrap();
3249        // get_entity goes through the reader pool.
3250        let got = kg.get_entity("fresh").unwrap().unwrap();
3251        assert_eq!(got.observations, vec!["v"]);
3252    }
3253
3254    #[test]
3255    fn test_concurrent_readers_consistent() {
3256        // Many readers hammering the pool while the writer mutates must never
3257        // panic, deadlock, or observe a torn graph. The final counts must match.
3258        let kg = new_kg_with_pool(4);
3259        seed_line(&kg, 50);
3260
3261        std::thread::scope(|s| {
3262            // 8 reader threads.
3263            for _ in 0..8 {
3264                s.spawn(|| {
3265                    for _ in 0..200 {
3266                        let _ = kg.get_entity("n10");
3267                        let _ = kg.search_nodes_filtered("obs", None, 0, 10);
3268                        let _ = kg.read_graph_filtered(None, 0, 100);
3269                        let _ = kg.get_entity_count();
3270                        let _ = kg.neighbors("n10", Direction::Both, None, 2);
3271                    }
3272                });
3273            }
3274            // 1 writer thread adding more entities concurrently.
3275            s.spawn(|| {
3276                for i in 100..160 {
3277                    kg.create_entities(&[Entity {
3278                        name: format!("w{i}"),
3279                        entity_type: "node".into(),
3280                        observations: vec![format!("w obs {i}")],
3281                    }])
3282                    .unwrap();
3283                }
3284            });
3285        });
3286
3287        // 50 seeded + 60 written.
3288        assert_eq!(kg.get_entity_count().unwrap(), 110);
3289        assert!(kg.get_entity("w159").unwrap().is_some());
3290    }
3291
3292    #[test]
3293    fn test_reader_pool_rejects_writes_internally() {
3294        // Sanity: query_only readers cannot mutate. We can't call a write through
3295        // the pool directly, but we can confirm a read method that *would* have
3296        // inserted (search_relations resolving a missing type) does not create a
3297        // phantom type — see the dedicated test below — and that reads under a
3298        // size-1 pool serialize correctly without deadlock.
3299        let kg = new_kg_with_pool(1);
3300        seed_line(&kg, 3);
3301        std::thread::scope(|s| {
3302            for _ in 0..4 {
3303                s.spawn(|| {
3304                    for _ in 0..100 {
3305                        let _ = kg.read_graph_filtered(None, 0, 10);
3306                    }
3307                });
3308            }
3309        });
3310        assert_eq!(kg.get_entity_count().unwrap(), 3);
3311    }
3312
3313    // ── Fix #6: read_graph relation scoping + export bound ─────────────────
3314
3315    #[test]
3316    fn test_read_graph_relations_scoped_to_page() {
3317        let kg = new_kg_with_pool(2);
3318        // n0 -> n1 -> n2 -> n3 (4 entities, 3 edges).
3319        seed_line(&kg, 4);
3320
3321        // Full page: all 3 edges present.
3322        let full = kg.read_graph_filtered(None, 0, usize::MAX).unwrap();
3323        assert_eq!(count_entities(&full), 4);
3324        assert_eq!(count_relations(&full), 3);
3325
3326        // Page of only the first entity (n0): its only edge n0->n1 has an
3327        // endpoint (n1) outside the page, so no relations are returned.
3328        let page1 = kg.read_graph_filtered(None, 0, 1).unwrap();
3329        assert_eq!(count_entities(&page1), 1);
3330        assert_eq!(count_relations(&page1), 0);
3331
3332        // Page of first two entities (n0, n1): edge n0->n1 fully inside, n1->n2
3333        // straddles the boundary and is excluded.
3334        let page2 = kg.read_graph_filtered(None, 0, 2).unwrap();
3335        assert_eq!(count_entities(&page2), 2);
3336        assert_eq!(count_relations(&page2), 1);
3337    }
3338
3339    #[test]
3340    fn test_read_graph_pagination_offset() {
3341        let kg = new_kg_with_pool(2);
3342        seed_line(&kg, 5);
3343        let g = kg.read_graph_filtered(None, 2, 2).unwrap();
3344        assert_eq!(count_entities(&g), 2);
3345        // Entities are ordered by id; offset 2 skips n0, n1.
3346        assert!(!g.contains("\"n0\""));
3347        assert!(!g.contains("\"n1\""));
3348        assert!(g.contains("\"n2\""));
3349        assert!(g.contains("\"n3\""));
3350    }
3351
3352    #[test]
3353    fn test_read_graph_empty() {
3354        let kg = new_kg_with_pool(2);
3355        let g = kg.read_graph_filtered(None, 0, usize::MAX).unwrap();
3356        assert_eq!(g, r#"{"entities":[],"relations":[]}"#);
3357    }
3358
3359    #[test]
3360    fn test_read_graph_filtered_by_type() {
3361        let kg = new_kg_with_pool(2);
3362        kg.create_entities(&[
3363            Entity { name: "p1".into(), entity_type: "person".into(), observations: vec![] },
3364            Entity { name: "q1".into(), entity_type: "place".into(), observations: vec![] },
3365            Entity { name: "p2".into(), entity_type: "person".into(), observations: vec![] },
3366        ])
3367        .unwrap();
3368        let g = kg.read_graph_filtered(Some("person"), 0, usize::MAX).unwrap();
3369        assert_eq!(count_entities(&g), 2);
3370        assert!(g.contains("\"p1\""));
3371        assert!(g.contains("\"p2\""));
3372        assert!(!g.contains("\"q1\""));
3373    }
3374
3375    #[test]
3376    fn test_export_respects_max_rows() {
3377        let kg = new_kg_with_pool(2);
3378        seed_line(&kg, 5);
3379
3380        // Unbounded export returns everything.
3381        let full = kg.export("json", i64::MAX).unwrap();
3382        assert_eq!(count_entities(&full), 5);
3383        assert_eq!(count_relations(&full), 4);
3384
3385        // Capped export truncates both arrays to the cap.
3386        let capped = kg.export("json", 2).unwrap();
3387        assert_eq!(count_entities(&capped), 2);
3388        assert_eq!(count_relations(&capped), 2);
3389    }
3390
3391    #[test]
3392    fn test_export_negative_max_rows_is_unbounded() {
3393        let kg = new_kg_with_pool(2);
3394        seed_line(&kg, 3);
3395        // SQLite treats a negative LIMIT as "no limit".
3396        let out = kg.export("json", -1).unwrap();
3397        assert_eq!(count_entities(&out), 3);
3398    }
3399
3400    // ── Fix #8: writes remain correct without the per-write PRAGMA optimize ─
3401
3402    #[test]
3403    fn test_many_small_write_batches_stay_consistent() {
3404        let kg = new_kg_with_pool(2);
3405        for i in 0..100 {
3406            kg.create_entities(&[Entity {
3407                name: format!("e{i}"),
3408                entity_type: "t".into(),
3409                observations: vec![format!("o{i}")],
3410            }])
3411            .unwrap();
3412        }
3413        assert_eq!(kg.get_entity_count().unwrap(), 100);
3414        // Search must still find a needle inserted across many tiny batches,
3415        // proving FTS stayed consistent without per-write optimization.
3416        let hits = kg.search_nodes_filtered("e57", None, 0, 10);
3417        assert!(hits.iter().any(|e| e.name == "e57"));
3418    }
3419
3420    // ── Fix #9: wipe fully resets the FTS indexes ──────────────────────────
3421
3422    #[test]
3423    fn test_wipe_clears_name_and_obs_fts() {
3424        let kg = new_kg_with_pool(2);
3425        kg.create_entities(&[Entity {
3426            name: "Einstein".into(),
3427            entity_type: "scientist".into(),
3428            observations: vec!["physics".into()],
3429        }])
3430        .unwrap();
3431
3432        // Both FTS indexes resolve before the wipe.
3433        assert_eq!(kg.search_nodes_filtered("Einstein", None, 0, 10).len(), 1);
3434        assert_eq!(kg.search_nodes_filtered("physics", None, 0, 10).len(), 1);
3435
3436        kg.wipe().unwrap();
3437
3438        // After wipe both indexes must be empty (a bare DELETE on an
3439        // external-content FTS5 table would have left stale rowids behind).
3440        assert_eq!(kg.get_entity_count().unwrap(), 0);
3441        assert!(kg.search_nodes_filtered("Einstein", None, 0, 10).is_empty());
3442        assert!(kg.search_nodes_filtered("physics", None, 0, 10).is_empty());
3443    }
3444
3445    #[test]
3446    fn test_wipe_then_recreate_search_works() {
3447        // Recreating the same names after a wipe must produce a clean, searchable
3448        // index — not a corrupted one or duplicate FTS rows.
3449        let kg = new_kg_with_pool(2);
3450        kg.create_entities(&[Entity {
3451            name: "Einstein".into(),
3452            entity_type: "scientist".into(),
3453            observations: vec!["physics".into()],
3454        }])
3455        .unwrap();
3456        kg.wipe().unwrap();
3457
3458        kg.create_entities(&[Entity {
3459            name: "Einstein".into(),
3460            entity_type: "scientist".into(),
3461            observations: vec!["physics".into(), "relativity".into()],
3462        }])
3463        .unwrap();
3464
3465        let by_name = kg.search_nodes_filtered("Einstein", None, 0, 10);
3466        assert_eq!(by_name.len(), 1, "exactly one Einstein after recreate");
3467        let by_obs = kg.search_nodes_filtered("relativity", None, 0, 10);
3468        assert_eq!(by_obs.len(), 1);
3469        assert_eq!(kg.get_entity_count().unwrap(), 1);
3470    }
3471
3472    // ── Read-only type/entity resolution (introduced by the reader pool) ───
3473
3474    #[test]
3475    fn test_search_relations_missing_type_returns_empty() {
3476        let kg = new_kg_with_pool(2);
3477        seed_line(&kg, 3); // edges of type "edge"
3478        // A filter for a relation type that does not exist must return nothing,
3479        // not every relation — and must not create a phantom type row.
3480        let r = kg.search_relations(None, None, Some("does_not_exist"));
3481        assert!(r.is_empty());
3482        // The phantom type must not have been inserted by the read.
3483        let types = kg.relation_type_counts();
3484        assert!(types.iter().all(|(t, _)| t != "does_not_exist"));
3485    }
3486
3487    #[test]
3488    fn test_search_relations_missing_from_returns_empty() {
3489        let kg = new_kg_with_pool(2);
3490        seed_line(&kg, 3);
3491        let r = kg.search_relations(Some("ghost"), None, None);
3492        assert!(r.is_empty(), "missing 'from' must not match every relation");
3493    }
3494
3495    #[test]
3496    fn test_search_relations_existing_filters_still_work() {
3497        let kg = new_kg_with_pool(2);
3498        seed_line(&kg, 3);
3499        let r = kg.search_relations(Some("n0"), None, Some("edge"));
3500        assert_eq!(r.len(), 1);
3501        assert_eq!(r[0].from, "n0");
3502        assert_eq!(r[0].to, "n1");
3503    }
3504
3505    #[test]
3506    fn test_neighbors_missing_type_returns_only_start() {
3507        let kg = new_kg_with_pool(2);
3508        seed_line(&kg, 3);
3509        let json = kg
3510            .neighbors("n0", Direction::Both, Some("nonexistent"), 2)
3511            .unwrap();
3512        // No edge matches the bogus type, so only the start node comes back.
3513        assert_eq!(count_entities(&json), 1);
3514        assert_eq!(count_relations(&json), 0);
3515    }
3516
3517    #[test]
3518    fn test_neighbors_existing_type_filters() {
3519        let kg = new_kg_with_pool(2);
3520        kg.create_entities(&[
3521            Entity { name: "a".into(), entity_type: "n".into(), observations: vec![] },
3522            Entity { name: "b".into(), entity_type: "n".into(), observations: vec![] },
3523            Entity { name: "c".into(), entity_type: "n".into(), observations: vec![] },
3524        ])
3525        .unwrap();
3526        kg.create_relations(&[
3527            Relation { from: "a".into(), to: "b".into(), relation_type: "knows".into() },
3528            Relation { from: "a".into(), to: "c".into(), relation_type: "likes".into() },
3529        ])
3530        .unwrap();
3531        let json = kg.neighbors("a", Direction::Outgoing, Some("knows"), 1).unwrap();
3532        assert!(json.contains("\"b\""));
3533        assert!(!json.contains("\"c\""));
3534        assert_eq!(count_relations(&json), 1);
3535    }
3536}