Skip to main content

mcp_memory/
kg.rs

1use rustc_hash::FxHashMap;
2use std::collections::{HashSet, VecDeque};
3use std::num::NonZeroUsize;
4use std::path::Path;
5use std::sync::atomic::{AtomicI64, AtomicUsize, Ordering};
6use std::time::{Duration, SystemTime, UNIX_EPOCH};
7
8use lru::LruCache;
9use parking_lot::{Mutex, MutexGuard};
10use rusqlite::{params, types::ToSql, Connection, OpenFlags};
11
12use crate::config::{Durability, SqliteTuning};
13use crate::errors::{MCSError, Result};
14use crate::types::{Entity, Relation};
15
16/// Cap on entities/relations collected in a single traversal (DoS guard).
17/// Prevents a dense graph at high depth from allocating unbounded memory.
18const MAX_TRAVERSAL_ENTITIES: usize = 500_000;
19const MAX_TRAVERSAL_RELS: usize = 2_000_000;
20
21fn sqlite_err(e: rusqlite::Error) -> MCSError {
22    MCSError::IoError(std::io::Error::other(e))
23}
24
25const fn is_not_found(e: &rusqlite::Error) -> bool {
26    matches!(e, rusqlite::Error::QueryReturnedNoRows)
27}
28
29#[inline(always)]
30fn now_us() -> i64 {
31    SystemTime::now()
32        .duration_since(UNIX_EPOCH)
33        .unwrap_or_default()
34        .as_micros() as i64
35}
36
37#[inline(always)]
38pub(crate) fn name_hash(name: &str) -> i64 {
39    let mut h: u64 = 0xcbf29ce484222325;
40    for b in name.bytes() {
41        h ^= u64::from(b);
42        h = h.wrapping_mul(0x100000001b3);
43    }
44    h as i64
45}
46
47fn load_observations(conn: &Connection, entity_id: i64) -> Result<Vec<String>> {
48    let mut stmt = conn
49        .prepare_cached("SELECT body FROM observation WHERE entity_id = ?1 ORDER BY idx")
50        .map_err(sqlite_err)?;
51    let rows = stmt
52        .query_map(params![entity_id], |row| row.get::<_, String>(0))
53        .map_err(sqlite_err)?
54        .filter_map(|r| r.ok())
55        .collect::<Vec<_>>();
56    Ok(rows)
57}
58
59fn load_observations_opt(conn: &Connection, entity_id: i64) -> Vec<String> {
60    load_observations(conn, entity_id).unwrap_or_default()
61}
62
63fn entity_name_lookup(conn: &Connection, name: &str) -> Result<Option<i64>> {
64    let h = name_hash(name);
65    let mut stmt = conn
66        .prepare_cached(
67            "SELECT id FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
68        )
69        .map_err(sqlite_err)?;
70    match stmt.query_row(params![h, name], |row| row.get::<_, i64>(0)) {
71        Ok(id) => Ok(Some(id)),
72        Err(e) if is_not_found(&e) => Ok(None),
73        Err(e) => Err(sqlite_err(e)),
74    }
75}
76
77fn get_type_id(conn: &Connection, type_name: &str, kind: i64) -> Result<i64> {
78    let mut sel = conn
79        .prepare_cached("SELECT id FROM type_dict WHERE kind = ?1 AND name = ?2")
80        .map_err(sqlite_err)?;
81    if let Ok(id) = sel.query_row(params![kind, type_name], |row| row.get::<_, i64>(0)) {
82        return Ok(id);
83    }
84    conn.execute(
85        "INSERT INTO type_dict (kind, name, count) VALUES (?1, ?2, 0)",
86        params![kind, type_name],
87    )
88    .map_err(sqlite_err)?;
89    Ok(conn.last_insert_rowid())
90}
91
92/// Read-only type lookup. Unlike [`get_type_id`] this never inserts, so it is
93/// safe to call on a `query_only` reader connection. Returns `None` when the
94/// type does not exist.
95fn lookup_type_id(conn: &Connection, type_name: &str, kind: i64) -> Option<i64> {
96    conn.prepare_cached("SELECT id FROM type_dict WHERE kind = ?1 AND name = ?2")
97        .ok()?
98        .query_row(params![kind, type_name], |row| row.get::<_, i64>(0))
99        .ok()
100}
101
102fn inc_type_count(conn: &Connection, type_id: i64, delta: i64) -> Result<()> {
103    conn.execute(
104        "UPDATE type_dict SET count = count + ?1 WHERE id = ?2",
105        params![delta, type_id],
106    )
107    .map_err(sqlite_err)?;
108    Ok(())
109}
110
111fn inc_graph_stat(conn: &Connection, key: &str, delta: i64) -> Result<()> {
112    conn.execute(
113        "UPDATE graph_stat SET value = value + ?1 WHERE key = ?2",
114        params![delta, key],
115    )
116    .map_err(sqlite_err)?;
117    Ok(())
118}
119
120fn read_graph_stat(conn: &Connection, key: &str) -> Result<i64> {
121    conn.query_row(
122        "SELECT value FROM graph_stat WHERE key = ?1",
123        params![key],
124        |row| row.get(0),
125    )
126    .map_err(sqlite_err)
127}
128
129fn name_of_type(conn: &Connection, type_id: i64) -> Result<String> {
130    conn.query_row(
131        "SELECT name FROM type_dict WHERE id = ?1",
132        params![type_id],
133        |row| row.get(0),
134    )
135    .map_err(sqlite_err)
136}
137
138fn select_all_types(conn: &Connection, kind: i64) -> Result<Vec<(String, usize)>> {
139    let mut stmt = conn
140        .prepare_cached(
141            "SELECT name, count FROM type_dict WHERE kind = ?1 AND count > 0 ORDER BY count DESC",
142        )
143        .map_err(sqlite_err)?;
144    let rows = stmt
145        .query_map(params![kind], |row| {
146            Ok((
147                row.get::<_, String>(0)?,
148                row.get::<_, i64>(1)? as usize,
149            ))
150        })
151        .map_err(sqlite_err)?
152        .filter_map(|r| r.ok())
153        .collect();
154    Ok(rows)
155}
156
157fn entity_by_id(conn: &Connection, id: i64) -> Result<Entity> {
158    let mut stmt = conn
159        .prepare_cached(
160            "SELECT e.name, t.name,
161               COALESCE((SELECT json_group_array(o.body ORDER BY o.idx) FROM observation o WHERE o.entity_id = e.id), '[]')
162             FROM entity e JOIN type_dict t ON t.id = e.type_id WHERE e.id = ?1 AND e.flags = 0",
163        )
164        .map_err(sqlite_err)?;
165    let (name, etype, obs_json): (String, String, String) = stmt
166        .query_row(params![id], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)))
167        .map_err(sqlite_err)?;
168    let observations: Vec<String> = serde_json::from_str(&obs_json).unwrap_or_default();
169    Ok(Entity {
170        name,
171        entity_type: etype,
172        observations,
173    })
174}
175
176/// Direction of relation traversal.
177#[derive(Debug, Clone, Copy, PartialEq, Eq)]
178pub enum Direction {
179    Outgoing,
180    Incoming,
181    Both,
182}
183
184impl Direction {
185    pub fn parse(s: Option<&str>) -> Self {
186        match s {
187            Some("OUTGOING") => Direction::Outgoing,
188            Some("INCOMING") => Direction::Incoming,
189            _ => Direction::Both,
190        }
191    }
192}
193
194/// Escape a string for embedding in JSON, writing directly into the given buffer.
195/// Avoids allocating a temporary `serde_json::Value` for the JSON-RPC wrapper.
196pub fn push_json_str(buf: &mut String, raw: &str) {
197    buf.push('"');
198    let mut start = 0;
199    let bytes = raw.as_bytes();
200    for (i, &b) in bytes.iter().enumerate() {
201        let esc: u8 = match b {
202            b'"' => b'"',
203            b'\\' => b'\\',
204            b'\n' => b'n',
205            b'\r' => b'r',
206            b'\t' => b't',
207            0x08 => b'b',
208            0x0C => b'f',
209            0x00..=0x07 | 0x0B | 0x0E..=0x1F => continue, // escaped below
210            _ => continue,
211        };
212        buf.push_str(&raw[start..i]);
213        buf.push('\\');
214        buf.push(esc as char);
215        start = i + 1;
216    }
217    // Control chars 0x00-0x1F not handled above: escape as \u00XX
218    for (i, &b) in bytes.iter().enumerate().skip(start) {
219        if b <= 0x07 || b == 0x0B || (0x0E..=0x1F).contains(&b) {
220            buf.push_str(&raw[start..i]);
221            write_escape_unicode(buf, b);
222            start = i + 1;
223        }
224    }
225    buf.push_str(&raw[start..]);
226    buf.push('"');
227}
228
229#[inline(never)]
230fn write_escape_unicode(buf: &mut String, b: u8) {
231    use std::fmt::Write;
232    write!(buf, "\\u{:04x}", b).unwrap();
233}
234
235// ── MetaCache ────────────────────────────────────────────────────────────
236
237#[derive(Copy, Clone)]
238struct EntityMeta {
239    id: i64,
240    type_id: i64,
241    obs_count: i64,
242    out_deg: i64,
243    in_deg: i64,
244}
245
246// ── Transaction guard (RAII rollback on error) ─────────────────────────
247
248struct TxGuard<'a> {
249    conn: &'a Connection,
250    done: bool,
251}
252
253impl<'a> TxGuard<'a> {
254    fn begin(conn: &'a Connection) -> Result<Self> {
255        // BEGIN IMMEDIATE acquires the WAL write lock up front rather than
256        // lazily on the first write. This makes the busy-timeout apply to lock
257        // acquisition deterministically and avoids `SQLITE_BUSY_SNAPSHOT`
258        // surprises when readers are concurrently active.
259        conn.execute_batch("BEGIN IMMEDIATE").map_err(sqlite_err)?;
260        Ok(Self { conn, done: false })
261    }
262
263    fn commit(mut self) -> Result<()> {
264        self.done = true;
265        self.conn.execute_batch("COMMIT").map_err(sqlite_err)
266    }
267}
268
269impl Drop for TxGuard<'_> {
270    fn drop(&mut self) {
271        if !self.done {
272            let _ = self.conn.execute_batch("ROLLBACK");
273        }
274    }
275}
276
277// ── Reader pool ───────────────────────────────────────────────────────────
278
279/// A small fixed pool of `query_only` SQLite connections used for read
280/// operations. WAL mode permits any number of concurrent readers alongside the
281/// single writer, so spreading reads across several connections lets them run
282/// in parallel instead of serializing on the writer's mutex.
283struct ReaderPool {
284    conns: Vec<Mutex<Connection>>,
285    next: AtomicUsize,
286}
287
288impl ReaderPool {
289    /// Acquire a reader connection. Fast path: grab the first idle one. If every
290    /// connection is busy, block on a round-robin pick so callers still make
291    /// progress (and never spin).
292    fn get(&self) -> MutexGuard<'_, Connection> {
293        for c in &self.conns {
294            if let Some(g) = c.try_lock() {
295                return g;
296            }
297        }
298        let i = self.next.fetch_add(1, Ordering::Relaxed) % self.conns.len();
299        self.conns[i].lock()
300    }
301}
302
303// ── GraphHandle ──────────────────────────────────────────────────────────
304
305pub struct GraphHandle {
306    /// The single read-write connection. SQLite allows only one writer, so all
307    /// mutations serialize here.
308    writer: Mutex<Connection>,
309    /// Pool of `query_only` connections for concurrent reads (WAL).
310    readers: ReaderPool,
311    seq_entity: AtomicI64,
312    seq_obs: AtomicI64,
313    cache: Mutex<LruCache<String, EntityMeta>>,
314}
315
316/// Open one `query_only` reader connection against an existing WAL database.
317///
318/// The connection is opened read-write at the OS level (so it can attach to the
319/// `-shm` wal-index — SQLite cannot read a WAL database through a pure
320/// `SQLITE_OPEN_READ_ONLY` handle) and then locked to reads with
321/// `PRAGMA query_only = ON`, which makes any accidental write error out.
322fn open_reader(path: &Path, tuning: &SqliteTuning) -> Result<Connection> {
323    let conn = Connection::open_with_flags(
324        path,
325        OpenFlags::SQLITE_OPEN_READ_WRITE
326            | OpenFlags::SQLITE_OPEN_NO_MUTEX
327            | OpenFlags::SQLITE_OPEN_URI,
328    )
329    .map_err(sqlite_err)?;
330    conn.busy_timeout(Duration::from_millis(tuning.busy_timeout_ms))
331        .map_err(sqlite_err)?;
332    conn.execute_batch(&format!(
333        "PRAGMA query_only   = ON;
334         PRAGMA cache_size   = -{};
335         PRAGMA temp_store   = MEMORY;
336         PRAGMA mmap_size    = {};",
337        tuning.cache_size_kb, tuning.mmap_size
338    ))
339    .map_err(sqlite_err)?;
340    Ok(conn)
341}
342
343impl GraphHandle {
344    pub fn new(
345        path: &Path,
346        durability: Durability,
347        tuning: SqliteTuning,
348        lru_cache_size: NonZeroUsize,
349        read_pool_size: usize,
350    ) -> Result<Self> {
351        let conn = Connection::open(path).map_err(sqlite_err)?;
352        // Apply the busy handler through the API so it is in force for every
353        // subsequent statement (including schema creation and BEGIN IMMEDIATE).
354        conn.busy_timeout(Duration::from_millis(tuning.busy_timeout_ms))
355            .map_err(sqlite_err)?;
356
357        // `page_size` and `auto_vacuum` are fixed when the database first gets
358        // content, and `page_size` additionally must precede `journal_mode=WAL`.
359        // Set both up front on this connection, before any table is created, so
360        // they take effect on a fresh database. On an existing database they are
361        // silently ignored (would require VACUUM to change).
362        conn.execute_batch(&format!(
363            "PRAGMA page_size    = {};
364             PRAGMA auto_vacuum  = INCREMENTAL;",
365            tuning.page_size
366        ))
367        .map_err(sqlite_err)?;
368
369        conn.execute_batch(&format!(
370             "PRAGMA journal_mode = WAL;
371             PRAGMA foreign_keys = OFF;
372             PRAGMA cache_size    = -{};
373             PRAGMA temp_store    = MEMORY;
374             PRAGMA busy_timeout  = {};
375             PRAGMA synchronous   = NORMAL;
376             PRAGMA journal_size_limit = {};",
377            tuning.cache_size_kb, tuning.busy_timeout_ms, tuning.journal_size_limit
378        ))
379        .map_err(sqlite_err)?;
380
381        conn.execute_batch(
382             "CREATE TABLE IF NOT EXISTS entity (
383                 id          INTEGER PRIMARY KEY,
384                 name_hash   INTEGER NOT NULL,
385                 name        TEXT    NOT NULL,
386                 type_id     INTEGER NOT NULL,
387                 obs_count   INTEGER NOT NULL DEFAULT 0,
388                 out_deg     INTEGER NOT NULL DEFAULT 0,
389                 in_deg      INTEGER NOT NULL DEFAULT 0,
390                 created_us  INTEGER NOT NULL,
391                 updated_us  INTEGER NOT NULL,
392                 flags       INTEGER NOT NULL DEFAULT 0
393             ) STRICT;
394
395             CREATE INDEX IF NOT EXISTS entity_by_hash
396                 ON entity(name_hash, type_id, obs_count, out_deg, in_deg)
397                 WHERE flags = 0;
398
399             CREATE INDEX IF NOT EXISTS entity_name_ci
400                 ON entity(lower(name))
401                 WHERE flags = 0;
402
403             CREATE TABLE IF NOT EXISTS observation (
404                 id          INTEGER PRIMARY KEY,
405                 entity_id   INTEGER NOT NULL,
406                 idx         INTEGER NOT NULL,
407                 body        TEXT    NOT NULL,
408                 created_us  INTEGER NOT NULL
409             ) STRICT;
410
411             CREATE INDEX IF NOT EXISTS obs_by_entity
412                 ON observation(entity_id, idx);
413
414             CREATE TABLE IF NOT EXISTS relation (
415                 from_id     INTEGER NOT NULL,
416                 to_id       INTEGER NOT NULL,
417                 type_id     INTEGER NOT NULL,
418                 created_us  INTEGER NOT NULL
419             ) STRICT;
420
421             CREATE INDEX IF NOT EXISTS rel_out
422                 ON relation(from_id, type_id, to_id);
423
424             CREATE INDEX IF NOT EXISTS rel_in
425                 ON relation(to_id, type_id, from_id);
426
427             CREATE VIRTUAL TABLE IF NOT EXISTS name_fts
428                 USING fts5(name, content='entity', content_rowid='id',
429                            tokenize='unicode61 remove_diacritics 2');
430
431             CREATE VIRTUAL TABLE IF NOT EXISTS obs_fts
432                 USING fts5(body, content='observation', content_rowid='id',
433                            tokenize='unicode61 remove_diacritics 2');
434
435             CREATE TRIGGER IF NOT EXISTS obs_fts_ai AFTER INSERT ON observation BEGIN
436               INSERT INTO obs_fts(rowid, body) VALUES (new.id, new.body);
437             END;
438
439             CREATE TRIGGER IF NOT EXISTS obs_fts_bd BEFORE DELETE ON observation BEGIN
440               INSERT INTO obs_fts(obs_fts, rowid, body) VALUES ('delete', old.id, '');
441             END;
442
443             CREATE TABLE IF NOT EXISTS type_dict (
444                 id     INTEGER PRIMARY KEY,
445                 kind   INTEGER NOT NULL,
446                 name   TEXT    NOT NULL,
447                 count  INTEGER NOT NULL DEFAULT 0
448             ) STRICT;
449
450             CREATE INDEX IF NOT EXISTS type_by_name
451                 ON type_dict(kind, name);
452
453             CREATE TABLE IF NOT EXISTS graph_stat (
454                 key    TEXT NOT NULL PRIMARY KEY,
455                 value  INTEGER NOT NULL
456             ) STRICT, WITHOUT ROWID;
457
458             CREATE TABLE IF NOT EXISTS hub_degree (
459                 entity_id INTEGER PRIMARY KEY,
460                 out_deg   INTEGER NOT NULL,
461                 in_deg    INTEGER NOT NULL
462             ) STRICT;
463
464             CREATE TABLE IF NOT EXISTS partition_map (
465                 table_name TEXT NOT NULL PRIMARY KEY,
466                 role       INTEGER NOT NULL,
467                 type_id    INTEGER,
468                 row_count  INTEGER NOT NULL DEFAULT 0
469             ) STRICT, WITHOUT ROWID;",
470        )
471        .map_err(sqlite_err)?;
472
473        conn.execute_batch(&format!("PRAGMA mmap_size = {};", tuning.mmap_size))
474            .map_err(sqlite_err)?;
475
476        let sync_pragma = match durability {
477            Durability::Sync => "PRAGMA synchronous = FULL",
478            Durability::Async => "PRAGMA synchronous = NORMAL",
479        };
480        conn.execute_batch(sync_pragma).map_err(sqlite_err)?;
481
482        // Bound the cost of `PRAGMA optimize` (here and in maintenance) so a
483        // large database cannot stall startup/maintenance analyzing every index.
484        conn.execute_batch("PRAGMA analysis_limit = 400;")
485            .map_err(sqlite_err)?;
486
487        let has_stat: bool = conn
488            .query_row("SELECT 1 FROM graph_stat LIMIT 1", [], |_| Ok(()))
489            .is_ok();
490        if !has_stat {
491            conn.execute_batch(
492                "INSERT INTO graph_stat(key, value) VALUES
493                 ('entities', 0), ('relations', 0), ('observations', 0),
494                 ('entity_seq', 0), ('obs_seq', 0);",
495            )
496            .map_err(sqlite_err)?;
497        }
498
499        conn.execute_batch("PRAGMA optimize;").map_err(sqlite_err)?;
500
501        let seq_entity = read_graph_stat(&conn, "entity_seq").unwrap_or(0);
502        let seq_obs = read_graph_stat(&conn, "obs_seq").unwrap_or(0);
503
504        // Open the reader pool against the now-initialized database. At least one
505        // reader is always created.
506        let pool_size = read_pool_size.max(1);
507        let mut conns = Vec::with_capacity(pool_size);
508        for _ in 0..pool_size {
509            conns.push(Mutex::new(open_reader(path, &tuning)?));
510        }
511        let readers = ReaderPool {
512            conns,
513            next: AtomicUsize::new(0),
514        };
515
516        Ok(Self {
517            writer: Mutex::new(conn),
518            readers,
519            seq_entity: AtomicI64::new(seq_entity),
520            seq_obs: AtomicI64::new(seq_obs),
521            cache: Mutex::new(LruCache::new(lru_cache_size)),
522        })
523    }
524
525    fn next_entity_id(&self) -> i64 {
526        self.seq_entity.fetch_add(1, Ordering::Relaxed) + 1
527    }
528
529    fn next_obs_id(&self) -> i64 {
530        self.seq_obs.fetch_add(1, Ordering::Relaxed) + 1
531    }
532
533    fn meta_get(&self, name: &str) -> Option<EntityMeta> {
534        self.cache.lock().get(name).copied()
535    }
536
537    fn meta_set(&self, name: &str, m: EntityMeta) {
538        self.cache.lock().put(name.to_string(), m);
539    }
540
541    fn meta_remove(&self, name: &str) {
542        self.cache.lock().pop(name);
543    }
544
545    fn meta_update(&self, name: &str, f: impl FnOnce(&mut EntityMeta)) {
546        let mut cache = self.cache.lock();
547        if let Some(m) = cache.get_mut(name) {
548            f(m);
549        }
550    }
551
552    fn get_entity_id(&self, conn: &Connection, name: &str) -> Result<Option<(i64, i64, i64, i64)>> {
553        if let Some(m) = self.meta_get(name) {
554            return Ok(Some((m.id, m.type_id, m.out_deg, m.in_deg)));
555        }
556        let h = name_hash(name);
557        let mut stmt = conn
558            .prepare_cached(
559                "SELECT id, type_id, obs_count, out_deg, in_deg
560                 FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
561            )
562            .map_err(sqlite_err)?;
563        match stmt.query_row(params![h, name], |row| {
564            Ok(EntityMeta {
565                id: row.get(0)?,
566                type_id: row.get(1)?,
567                obs_count: row.get(2)?,
568                out_deg: row.get(3)?,
569                in_deg: row.get(4)?,
570            })
571        }) {
572            Ok(m) => {
573                self.meta_set(name, m);
574                Ok(Some((m.id, m.type_id, m.out_deg, m.in_deg)))
575            }
576            Err(e) if is_not_found(&e) => Ok(None),
577            Err(e) => Err(sqlite_err(e)),
578        }
579    }
580
581    fn sync_seqs(&self, conn: &Connection) -> Result<()> {
582        let seq_e = self.seq_entity.load(Ordering::Relaxed);
583        let seq_o = self.seq_obs.load(Ordering::Relaxed);
584        conn.execute(
585            "UPDATE graph_stat SET value = CASE key WHEN 'entity_seq' THEN ?1 WHEN 'obs_seq' THEN ?2 ELSE value END
586             WHERE key IN ('entity_seq', 'obs_seq')",
587            params![seq_e, seq_o],
588        )
589        .map_err(sqlite_err)?;
590        Ok(())
591    }
592
593    // ── Public API ──────────────────────────────────────────────────────
594
595    pub fn get_entity(&self, name: &str) -> Result<Option<Entity>> {
596        if name.is_empty() {
597            return Ok(None);
598        }
599
600        if let Some(m) = self.meta_get(name) {
601            let conn = self.readers.get();
602            let etype = name_of_type(&conn, m.type_id).unwrap_or_default();
603            let observations = load_observations_opt(&conn, m.id);
604            return Ok(Some(Entity {
605                name: name.to_string(),
606                entity_type: etype,
607                observations,
608            }));
609        }
610
611        let conn = self.readers.get();
612        let h = name_hash(name);
613        let mut stmt = conn
614            .prepare_cached(
615                "SELECT e.id, e.type_id, e.name, t.name,
616                        e.obs_count, e.out_deg, e.in_deg
617                 FROM entity e
618                 JOIN type_dict t ON t.id = e.type_id
619                 WHERE e.name_hash = ?1 AND e.name = ?2 AND e.flags = 0",
620            )
621            .map_err(sqlite_err)?;
622        match stmt.query_row(params![h, name], |row| {
623            let id: i64 = row.get(0)?;
624            let type_id: i64 = row.get(1)?;
625            let ename: String = row.get(2)?;
626            let etype: String = row.get(3)?;
627            let obs_count: i64 = row.get(4)?;
628            let out_deg: i64 = row.get(5)?;
629            let in_deg: i64 = row.get(6)?;
630            Ok((id, type_id, ename, etype, obs_count, out_deg, in_deg))
631        }) {
632            Ok((id, type_id, ename, etype, obs_count, out_deg, in_deg)) => {
633                let observations = load_observations_opt(&conn, id);
634                drop(stmt);
635                drop(conn);
636                self.meta_set(&ename, EntityMeta { id, type_id, obs_count, out_deg, in_deg });
637                Ok(Some(Entity {
638                    name: ename,
639                    entity_type: etype,
640                    observations,
641                }))
642            }
643            Err(e) if is_not_found(&e) => Ok(None),
644            Err(e) => Err(sqlite_err(e)),
645        }
646    }
647
648    pub fn create_entities(&self, entities: &[Entity]) -> Result<Vec<Entity>> {
649        let conn = self.writer.lock();
650        let tx = TxGuard::begin(&conn)?;
651
652        let mut ins_ent = conn
653            .prepare_cached(
654                "INSERT INTO entity (id, name_hash, name, type_id, obs_count, out_deg, in_deg, created_us, updated_us, flags)
655                 SELECT ?1, ?2, ?3, ?4, ?5, 0, 0, ?6, ?6, 0
656                 WHERE NOT EXISTS (SELECT 1 FROM entity WHERE name_hash = ?2 AND name = ?3 AND flags = 0)",
657            )
658            .map_err(sqlite_err)?;
659
660        let mut ins_fts = conn
661            .prepare_cached("INSERT INTO name_fts (rowid, name) VALUES (?1, ?2)")
662            .map_err(sqlite_err)?;
663
664        let batch_ts = now_us();
665        let mut type_cache: FxHashMap<String, i64> = FxHashMap::default();
666        let mut type_deltas: FxHashMap<i64, i64> = FxHashMap::default();
667        let mut total_entities: i64 = 0;
668        let mut total_obs: i64 = 0;
669        let mut created = Vec::new();
670        let mut created_metas: Vec<(String, EntityMeta)> = Vec::new();
671        let mut obs_sql = String::new();
672
673        for entity in entities {
674            if entity.name.is_empty() {
675                continue;
676            }
677            let h = name_hash(&entity.name);
678            let id = self.next_entity_id();
679            let type_id = match type_cache.get(entity.entity_type.as_str()) {
680                Some(t) => *t,
681                None => {
682                    let t = get_type_id(&conn, &entity.entity_type, 0)?;
683                    type_cache.insert(entity.entity_type.clone(), t);
684                    t
685                }
686            };
687            let obs_count = entity.observations.len() as i64;
688
689            let changed = ins_ent
690                .execute(params![id, h, entity.name, type_id, obs_count, batch_ts])
691                .map_err(sqlite_err)?;
692            if changed == 0 {
693                continue;
694            }
695
696            let n = entity.observations.len();
697            if n > 0 {
698                obs_sql.clear();
699
700                let mut oids = Vec::with_capacity(n);
701                let mut idxs = Vec::with_capacity(n);
702                for _ in 0..n {
703                    oids.push(self.next_obs_id());
704                }
705                for i in 0..n as i64 {
706                    idxs.push(i);
707                }
708
709                obs_sql.push_str("INSERT INTO observation (id,entity_id,idx,body,created_us) VALUES");
710                for i in 0..n {
711                    if i > 0 { obs_sql.push(','); }
712                    obs_sql.push_str("(?,?,?,?,?)");
713                }
714
715                let mut obs_params: Vec<&dyn ToSql> = Vec::with_capacity(n * 5);
716                for (i, obs) in entity.observations.iter().enumerate() {
717                    obs_params.push(&oids[i]);
718                    obs_params.push(&id);
719                    obs_params.push(&idxs[i]);
720                    obs_params.push(obs);
721                    obs_params.push(&batch_ts);
722                }
723
724                conn.execute(&obs_sql, obs_params.as_slice())
725                    .map_err(sqlite_err)?;
726            }
727
728            ins_fts
729                .execute(params![id, entity.name])
730                .map_err(sqlite_err)?;
731
732            *type_deltas.entry(type_id).or_insert(0) += 1;
733            total_entities += 1;
734            total_obs += obs_count;
735
736            created.push(entity.clone());
737            created_metas.push((entity.name.clone(), EntityMeta {
738                id,
739                type_id,
740                obs_count,
741                out_deg: 0,
742                in_deg: 0,
743            }));
744        }
745
746        if total_entities > 0 {
747            for (type_id, delta) in &type_deltas {
748                inc_type_count(&conn, *type_id, *delta)?;
749            }
750            inc_graph_stat(&conn, "entities", total_entities)?;
751            inc_graph_stat(&conn, "observations", total_obs)?;
752            self.sync_seqs(&conn)?;
753        }
754
755        tx.commit()?;
756
757        // Note: `PRAGMA optimize` is intentionally *not* run here. It analyzes
758        // indexes and writes to internal stat tables — pure overhead on the
759        // write hot path. The periodic `run_maintenance` task covers it.
760
761        if !created_metas.is_empty() {
762            let mut cache = self.cache.lock();
763            for (name, meta) in &created_metas {
764                cache.put(name.clone(), *meta);
765            }
766        }
767
768        Ok(created)
769    }
770
771    pub fn delete_entities(&self, names: &[String]) -> Result<()> {
772        if names.is_empty() {
773            return Ok(());
774        }
775        let conn = self.writer.lock();
776
777        // Phase 1: Resolve all names to (id, type_id).
778        let mut resolved: Vec<(i64, i64, String)> = Vec::with_capacity(names.len());
779        let mut sel = conn
780            .prepare_cached(
781                "SELECT id, type_id FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
782            )
783            .map_err(sqlite_err)?;
784        for name in names {
785            let h = name_hash(name);
786            let (id, type_id) = match sel.query_row(params![h, name], |row| {
787                Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?))
788            }) {
789                Ok(v) => v,
790                Err(e) if is_not_found(&e) => continue,
791                Err(e) => return Err(sqlite_err(e)),
792            };
793            resolved.push((id, type_id, name.clone()));
794        }
795
796        if resolved.is_empty() {
797            return Ok(());
798        }
799
800        let ids: Vec<i64> = resolved.iter().map(|(id, _, _)| *id).collect();
801        let n = ids.len();
802
803        // Phase 2: Batch DELETE observations.
804        let obs_p: Vec<String> = (0..n).map(|i| format!("?{}", i + 1)).collect();
805        let obs_sql = format!(
806            "DELETE FROM observation WHERE entity_id IN ({})",
807            obs_p.join(",")
808        );
809        let obs_refs: Vec<&dyn ToSql> = ids.iter().map(|id| id as &dyn ToSql).collect();
810        let obs_deleted = conn
811            .execute(&obs_sql, obs_refs.as_slice())
812            .map_err(sqlite_err)? as i64;
813
814        // Phase 3: Batch DELETE relations.
815        let rel_sql = format!(
816            "DELETE FROM relation WHERE from_id IN ({}) OR to_id IN ({})",
817            obs_p.join(","),
818            obs_p.join(",")
819        );
820        let rel_refs: Vec<&dyn ToSql> = ids.iter().map(|id| id as &dyn ToSql).collect();
821        let rel_deleted = conn
822            .execute(&rel_sql, rel_refs.as_slice())
823            .map_err(sqlite_err)? as i64;
824
825        // Phase 4: Batch FTS deletes.
826        let fts_values: Vec<String> = (0..n)
827            .map(|_| "('delete', ?, '')".to_string())
828            .collect();
829        let fts_sql = format!(
830            "INSERT INTO name_fts(name_fts, rowid, name) VALUES {}",
831            fts_values.join(", ")
832        );
833        conn.execute(&fts_sql, rusqlite::params_from_iter(&ids))
834            .map_err(sqlite_err)?;
835
836        // Aggregate type count deltas.
837        let mut type_deltas: FxHashMap<i64, i64> = FxHashMap::default();
838        for &(_, type_id, _) in &resolved {
839            *type_deltas.entry(type_id).or_insert(0) += 1;
840        }
841
842        // Phase 5: Batch type count decrements.
843        if !type_deltas.is_empty() {
844            let m = type_deltas.len();
845            let type_keys: Vec<i64> = type_deltas.keys().cloned().collect();
846            let type_vals: Vec<i64> = type_deltas.values().map(|v| -*v).collect();
847            let mut case_parts: Vec<String> = Vec::with_capacity(m);
848            let mut id_parts: Vec<String> = Vec::with_capacity(m);
849            for i in 0..m {
850                case_parts.push(format!("WHEN ?{} THEN ?{}", i + 1, m + i + 1));
851                id_parts.push(format!("?{}", i + 1));
852            }
853            let sql = format!(
854                "UPDATE type_dict SET count = MAX(0, count + CASE id {} ELSE 0 END) WHERE id IN ({})",
855                case_parts.join(" "),
856                id_parts.join(","),
857            );
858            let mut params: Vec<Box<dyn ToSql>> = Vec::with_capacity(2 * m);
859            for id in &type_keys {
860                params.push(Box::new(*id));
861            }
862            for delta in &type_vals {
863                params.push(Box::new(*delta));
864            }
865            let param_refs: Vec<&dyn ToSql> = params.iter().map(|p| p.as_ref()).collect();
866            conn.execute(&sql, param_refs.as_slice()).map_err(sqlite_err)?;
867        }
868
869        // Phase 6: Batch DELETE entities.
870        conn.execute(
871            &format!("DELETE FROM entity WHERE id IN ({})", obs_p.join(",")),
872            ids.iter().map(|id| id as &dyn ToSql).collect::<Vec<_>>().as_slice(),
873        )
874        .map_err(sqlite_err)?;
875
876        // Phase 7: Update stats.
877        inc_graph_stat(&conn, "entities", -(n as i64))?;
878        inc_graph_stat(&conn, "observations", -obs_deleted)?;
879        inc_graph_stat(&conn, "relations", -rel_deleted)?;
880
881        // Phase 8: Remove from cache.
882        for (_, _, name) in &resolved {
883            self.meta_remove(name);
884        }
885
886        Ok(())
887    }
888
889    /// Remove a `code:file` entity and every symbol it `defines`, so the file
890    /// can be re-indexed cleanly. Relations touching the removed entities are
891    /// cascaded by [`Self::delete_entities`]. Returns the number of entities
892    /// removed (file + symbols).
893    #[cfg(feature = "code")]
894    pub fn code_purge_file(&self, rel_path: &str) -> Result<usize> {
895        let defines = self.search_relations(Some(rel_path), None, Some("defines"), Some(crate::code::MAX_SYMBOLS_PER_FILE));
896        let mut names: Vec<String> = defines.into_iter().map(|r| r.to).collect();
897        names.push(rel_path.to_string());
898        let n = names.len();
899        self.delete_entities(&names)?;
900        Ok(n)
901    }
902
903    pub fn create_relations(&self, relations: &[Relation]) -> Result<Vec<Relation>> {
904        let conn = self.writer.lock();
905        let tx = TxGuard::begin(&conn)?;
906
907        let mut ins = conn
908            .prepare_cached(
909                "INSERT INTO relation (from_id, to_id, type_id, created_us)
910                 SELECT ?1, ?2, ?3, ?4
911                 WHERE NOT EXISTS (SELECT 1 FROM relation WHERE from_id = ?1 AND to_id = ?2 AND type_id = ?3)",
912            )
913            .map_err(sqlite_err)?;
914
915        let ts = now_us();
916        let mut type_cache: FxHashMap<String, i64> = FxHashMap::default();
917        let mut type_deltas: FxHashMap<i64, i64> = FxHashMap::default();
918        let mut out_deltas: FxHashMap<i64, i64> = FxHashMap::default();
919        let mut in_deltas: FxHashMap<i64, i64> = FxHashMap::default();
920        let mut total_relations: i64 = 0;
921        let mut created = Vec::new();
922
923        for rel in relations {
924            let (from_id, _, _, _) = match self.get_entity_id(&conn, &rel.from)? {
925                Some(v) => v,
926                None => continue,
927            };
928            let (to_id, _, _, _) = match self.get_entity_id(&conn, &rel.to)? {
929                Some(v) => v,
930                None => continue,
931            };
932            let type_id = match type_cache.get(rel.relation_type.as_str()) {
933                Some(t) => *t,
934                None => {
935                    let t = get_type_id(&conn, &rel.relation_type, 1)?;
936                    type_cache.insert(rel.relation_type.clone(), t);
937                    t
938                }
939            };
940
941            let changed = ins
942                .execute(params![from_id, to_id, type_id, ts])
943                .map_err(sqlite_err)?;
944            if changed == 0 {
945                continue;
946            }
947
948            *out_deltas.entry(from_id).or_insert(0) += 1;
949            *in_deltas.entry(to_id).or_insert(0) += 1;
950            *type_deltas.entry(type_id).or_insert(0) += 1;
951            total_relations += 1;
952
953            created.push(rel.clone());
954        }
955
956        if total_relations > 0 {
957            for (id, delta) in &out_deltas {
958                conn.execute(
959                    "UPDATE entity SET out_deg = out_deg + ?1 WHERE id = ?2",
960                    params![delta, id],
961                )
962                .map_err(sqlite_err)?;
963            }
964            for (id, delta) in in_deltas {
965                conn.execute(
966                    "UPDATE entity SET in_deg = in_deg + ?1 WHERE id = ?2",
967                    params![delta, id],
968                )
969                .map_err(sqlite_err)?;
970            }
971            for (type_id, delta) in &type_deltas {
972                inc_type_count(&conn, *type_id, *delta)?;
973            }
974            inc_graph_stat(&conn, "relations", total_relations)?;
975        }
976
977        tx.commit()?;
978
979        // See `create_entities`: `PRAGMA optimize` is deferred to maintenance.
980
981        if !created.is_empty() {
982            let mut cache = self.cache.lock();
983            for rel in &created {
984                if let Some(m) = cache.get_mut(&rel.from) {
985                    m.out_deg += 1;
986                }
987                if let Some(m) = cache.get_mut(&rel.to) {
988                    m.in_deg += 1;
989                }
990            }
991        }
992
993        Ok(created)
994    }
995
996    pub fn delete_relations(&self, relations: &[Relation]) -> Result<()> {
997        if relations.is_empty() {
998            return Ok(());
999        }
1000        let conn = self.writer.lock();
1001
1002        // Resolve names to IDs and collect valid triples.
1003        let mut triples: Vec<(i64, i64, i64)> = Vec::with_capacity(relations.len());
1004        let mut names: Vec<(String, String)> = Vec::with_capacity(relations.len());
1005        for rel in relations {
1006            let (from_id, _, _, _) = match self.get_entity_id(&conn, &rel.from)? {
1007                Some(v) => v,
1008                None => continue,
1009            };
1010            let (to_id, _, _, _) = match self.get_entity_id(&conn, &rel.to)? {
1011                Some(v) => v,
1012                None => continue,
1013            };
1014            let type_id = match get_type_id(&conn, &rel.relation_type, 1) {
1015                Ok(id) => id,
1016                Err(_) => continue,
1017            };
1018            triples.push((from_id, to_id, type_id));
1019            names.push((rel.from.clone(), rel.to.clone()));
1020        }
1021
1022        if triples.is_empty() {
1023            return Ok(());
1024        }
1025
1026        // Batch DELETE using VALUES subquery.
1027        let mut sql = String::from(
1028            "DELETE FROM relation WHERE (from_id, to_id, type_id) IN (",
1029        );
1030        for (i, _) in triples.iter().enumerate() {
1031            if i > 0 {
1032                sql.push_str(", ");
1033            }
1034            let base = (i * 3) + 1;
1035            sql.push_str(&format!("SELECT ?{b}, ?{bp1}, ?{bp2}", b = base, bp1 = base + 1, bp2 = base + 2));
1036        }
1037        sql.push(')');
1038
1039        let mut param_values: Vec<Box<dyn ToSql>> = Vec::with_capacity(triples.len() * 3);
1040        for &(f, t, tp) in &triples {
1041            param_values.push(Box::new(f));
1042            param_values.push(Box::new(t));
1043            param_values.push(Box::new(tp));
1044        }
1045        let param_refs: Vec<&dyn ToSql> = param_values.iter().map(|p| p.as_ref()).collect();
1046        let total = conn.execute(&sql, param_refs.as_slice()).map_err(sqlite_err)?;
1047        if total == 0 {
1048            return Ok(());
1049        }
1050
1051        // Aggregate degree and type deltas.
1052        let mut out_deltas: FxHashMap<i64, i64> = FxHashMap::default();
1053        let mut in_deltas: FxHashMap<i64, i64> = FxHashMap::default();
1054        let mut type_deltas: FxHashMap<i64, i64> = FxHashMap::default();
1055        for &(from_id, to_id, type_id) in &triples {
1056            *out_deltas.entry(from_id).or_insert(0) += 1;
1057            *in_deltas.entry(to_id).or_insert(0) += 1;
1058            *type_deltas.entry(type_id).or_insert(0) += 1;
1059        }
1060
1061        // Batch out_deg updates.
1062        let out_keys: Vec<i64> = out_deltas.keys().cloned().collect();
1063        let out_vals: Vec<i64> = out_deltas.values().cloned().collect();
1064        if !out_keys.is_empty() {
1065            let m = out_keys.len();
1066            let mut case_parts: Vec<String> = Vec::with_capacity(m);
1067            let mut id_parts: Vec<String> = Vec::with_capacity(m);
1068            for i in 0..m {
1069                case_parts.push(format!("WHEN ?{} THEN ?{}", i + 1, m + i + 1));
1070                id_parts.push(format!("?{}", i + 1));
1071            }
1072            let sql = format!(
1073                "UPDATE entity SET out_deg = MAX(0, out_deg - CASE id {} ELSE 0 END) WHERE id IN ({})",
1074                case_parts.join(" "),
1075                id_parts.join(","),
1076            );
1077            let mut params: Vec<Box<dyn ToSql>> = Vec::with_capacity(2 * m);
1078            for id in &out_keys {
1079                params.push(Box::new(*id));
1080            }
1081            for delta in &out_vals {
1082                params.push(Box::new(*delta));
1083            }
1084            let param_refs: Vec<&dyn ToSql> = params.iter().map(|p| p.as_ref()).collect();
1085            conn.execute(&sql, param_refs.as_slice()).map_err(sqlite_err)?;
1086        }
1087
1088        // Batch in_deg updates.
1089        let in_keys: Vec<i64> = in_deltas.keys().cloned().collect();
1090        let in_vals: Vec<i64> = in_deltas.values().cloned().collect();
1091        if !in_keys.is_empty() {
1092            let m = in_keys.len();
1093            let mut case_parts: Vec<String> = Vec::with_capacity(m);
1094            let mut id_parts: Vec<String> = Vec::with_capacity(m);
1095            for i in 0..m {
1096                case_parts.push(format!("WHEN ?{} THEN ?{}", i + 1, m + i + 1));
1097                id_parts.push(format!("?{}", i + 1));
1098            }
1099            let sql = format!(
1100                "UPDATE entity SET in_deg = MAX(0, in_deg - CASE id {} ELSE 0 END) WHERE id IN ({})",
1101                case_parts.join(" "),
1102                id_parts.join(","),
1103            );
1104            let mut params: Vec<Box<dyn ToSql>> = Vec::with_capacity(2 * m);
1105            for id in &in_keys {
1106                params.push(Box::new(*id));
1107            }
1108            for delta in &in_vals {
1109                params.push(Box::new(*delta));
1110            }
1111            let param_refs: Vec<&dyn ToSql> = params.iter().map(|p| p.as_ref()).collect();
1112            conn.execute(&sql, param_refs.as_slice()).map_err(sqlite_err)?;
1113        }
1114
1115        // Batch type_dict updates.
1116        let type_keys: Vec<i64> = type_deltas.keys().cloned().collect();
1117        let type_vals: Vec<i64> = type_deltas.values().cloned().collect();
1118        if !type_keys.is_empty() {
1119            let m = type_keys.len();
1120            let mut case_parts: Vec<String> = Vec::with_capacity(m);
1121            let mut id_parts: Vec<String> = Vec::with_capacity(m);
1122            for i in 0..m {
1123                case_parts.push(format!("WHEN ?{} THEN ?{}", i + 1, m + i + 1));
1124                id_parts.push(format!("?{}", i + 1));
1125            }
1126            let sql = format!(
1127                "UPDATE type_dict SET count = MAX(0, count - CASE id {} ELSE 0 END) WHERE id IN ({})",
1128                case_parts.join(" "),
1129                id_parts.join(","),
1130            );
1131            let mut params: Vec<Box<dyn ToSql>> = Vec::with_capacity(2 * m);
1132            for id in &type_keys {
1133                params.push(Box::new(*id));
1134            }
1135            for delta in &type_vals {
1136                params.push(Box::new(*delta));
1137            }
1138            let param_refs: Vec<&dyn ToSql> = params.iter().map(|p| p.as_ref()).collect();
1139            conn.execute(&sql, param_refs.as_slice()).map_err(sqlite_err)?;
1140        }
1141
1142        inc_graph_stat(&conn, "relations", -(total as i64))?;
1143
1144        // Update cache for resolved triples (self-heals on next reload if
1145        // a triple happened to not match).
1146        for (from, to) in &names {
1147            self.meta_update(from, |m| m.out_deg = m.out_deg.saturating_sub(1));
1148            self.meta_update(to, |m| m.in_deg = m.in_deg.saturating_sub(1));
1149        }
1150
1151        Ok(())
1152    }
1153
1154    pub fn add_observations(&self, entity_name: &str, contents: &[String]) -> Result<Vec<String>> {
1155        let conn = self.writer.lock();
1156        let (id, _type_id, _, _) = match self.get_entity_id(&conn, entity_name)? {
1157            Some(v) => v,
1158            None => {
1159                return Err(MCSError::InvalidParams(format!(
1160                    "Entity '{entity_name}' not found"
1161                )))
1162            }
1163        };
1164
1165        let mut max_idx: i64 = conn
1166            .query_row(
1167                "SELECT COALESCE(MAX(idx), -1) FROM observation WHERE entity_id = ?1",
1168                params![id],
1169                |row| row.get(0),
1170            )
1171            .map_err(sqlite_err)?;
1172
1173        let ts = now_us();
1174        let mut ins_obs = conn
1175            .prepare_cached(
1176                "INSERT INTO observation (id, entity_id, idx, body, created_us) VALUES (?1, ?2, ?3, ?4, ?5)",
1177            )
1178            .map_err(sqlite_err)?;
1179
1180        for content in contents {
1181            max_idx += 1;
1182            let oid = self.next_obs_id();
1183            ins_obs
1184                .execute(params![oid, id, max_idx, content, ts])
1185                .map_err(sqlite_err)?;
1186        }
1187        let added = contents.to_vec();
1188
1189        let count: i64 = contents.len() as i64;
1190        conn.execute(
1191            "UPDATE entity SET obs_count = obs_count + ?1, updated_us = ?2 WHERE id = ?3",
1192            params![count, ts, id],
1193        )
1194        .map_err(sqlite_err)?;
1195
1196        inc_graph_stat(&conn, "observations", count)?;
1197        self.sync_seqs(&conn)?;
1198
1199        self.meta_update(entity_name, |m| m.obs_count += count);
1200
1201        Ok(added)
1202    }
1203
1204    pub fn delete_observations(&self, entity_name: &str, observations: &[String]) -> Result<()> {
1205        if observations.is_empty() {
1206            return Ok(());
1207        }
1208        let conn = self.writer.lock();
1209        let (id, _, _, _) = match self.get_entity_id(&conn, entity_name)? {
1210            Some(v) => v,
1211            None => {
1212                return Err(MCSError::InvalidParams(format!(
1213                    "Entity '{entity_name}' not found"
1214                )))
1215            }
1216        };
1217
1218        let placeholders: Vec<String> = (0..observations.len())
1219            .map(|i| format!("?{}", i + 2))
1220            .collect();
1221        let sql = format!(
1222            "DELETE FROM observation WHERE entity_id = ?1 AND body IN ({})",
1223            placeholders.join(",")
1224        );
1225
1226        let mut param_values: Vec<Box<dyn ToSql>> = Vec::with_capacity(1 + observations.len());
1227        param_values.push(Box::new(id));
1228        for obs in observations {
1229            param_values.push(Box::new(obs.as_str()));
1230        }
1231        let param_refs: Vec<&dyn ToSql> = param_values.iter().map(|p| p.as_ref()).collect();
1232        let removed = conn.execute(&sql, param_refs.as_slice()).map_err(sqlite_err)? as i64;
1233
1234        if removed > 0 {
1235            conn.execute(
1236                "UPDATE entity SET obs_count = MAX(0, obs_count - ?1), updated_us = ?2 WHERE id = ?3",
1237                params![removed, now_us(), id],
1238            )
1239            .map_err(sqlite_err)?;
1240            inc_graph_stat(&conn, "observations", -removed)?;
1241
1242            self.meta_update(entity_name, |m| m.obs_count = m.obs_count.saturating_sub(removed));
1243        }
1244
1245        Ok(())
1246    }
1247
1248    pub fn upsert_entities(&self, entities: &[Entity]) -> Result<Vec<Entity>> {
1249        let mut results = Vec::new();
1250        for entity in entities {
1251            if let Some(existing) = self.get_entity(&entity.name)? {
1252                // Update type if different.
1253                if existing.entity_type != entity.entity_type {
1254                    let conn = self.writer.lock();
1255                    let old_type_id = conn
1256                        .query_row(
1257                            "SELECT type_id FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
1258                            params![name_hash(&entity.name), entity.name],
1259                            |row| row.get::<_, i64>(0),
1260                        )
1261                        .map_err(sqlite_err)?;
1262                    let new_type_id = get_type_id(&conn, &entity.entity_type, 0)?;
1263                    inc_type_count(&conn, old_type_id, -1)?;
1264                    inc_type_count(&conn, new_type_id, 1)?;
1265                    conn.execute(
1266                        "UPDATE entity SET type_id = ?1, updated_us = ?2 WHERE name_hash = ?3 AND name = ?4",
1267                        params![new_type_id, now_us(), name_hash(&entity.name), entity.name],
1268                    )
1269                    .map_err(sqlite_err)?;
1270                    // Invalidate cache so subsequent get_entity reloads meta.
1271                    self.meta_remove(&entity.name);
1272                }
1273                // Merge observations (append new ones not already present).
1274                let existing_set: HashSet<&str> =
1275                    existing.observations.iter().map(|s| s.as_str()).collect();
1276                let to_add: Vec<String> = entity
1277                    .observations
1278                    .iter()
1279                    .filter(|o| !existing_set.contains(o.as_str()))
1280                    .cloned()
1281                    .collect();
1282                if !to_add.is_empty() {
1283                    self.add_observations(&entity.name, &to_add)?;
1284                }
1285                let updated = self
1286                    .get_entity(&entity.name)?
1287                    .unwrap_or(entity.clone());
1288                results.push(updated);
1289            } else {
1290                let c = self.create_entities(std::slice::from_ref(entity))?;
1291                if let Some(e) = c.into_iter().next() {
1292                    results.push(e);
1293                }
1294            }
1295        }
1296        Ok(results)
1297    }
1298
1299    pub fn merge_entities(&self, source: &str, target: &str) -> Result<Entity> {
1300        let conn = self.writer.lock();
1301        let (src_id, _, _, _) = match self.get_entity_id(&conn, source)? {
1302            Some(v) => v,
1303            None => {
1304                return Err(MCSError::InvalidParams(format!(
1305                    "Source entity '{source}' not found"
1306                )))
1307            }
1308        };
1309        let (tgt_id, _, _, _) = match self.get_entity_id(&conn, target)? {
1310            Some(v) => v,
1311            None => {
1312                return Err(MCSError::InvalidParams(format!(
1313                    "Target entity '{target}' not found"
1314                )))
1315            }
1316        };
1317
1318        if src_id == tgt_id {
1319            return self.get_entity(target)?.ok_or_else(|| {
1320                MCSError::InvalidParams("Target entity not found after merge".into())
1321            });
1322        }
1323
1324        // Move observations from source to target.
1325        let mut obs_count: i64 = 0;
1326        {
1327            let mut max_idx: i64 = conn
1328                .query_row(
1329                    "SELECT COALESCE(MAX(idx), -1) FROM observation WHERE entity_id = ?1",
1330                    params![tgt_id],
1331                    |row| row.get(0),
1332                )
1333                .map_err(sqlite_err)?;
1334            let mut sel_obs = conn
1335                .prepare_cached(
1336                    "SELECT id, body FROM observation WHERE entity_id = ?1 ORDER BY idx",
1337                )
1338                .map_err(sqlite_err)?;
1339            let mut upd_obs = conn
1340                .prepare_cached("UPDATE observation SET entity_id = ?1, idx = ?2 WHERE id = ?3")
1341                .map_err(sqlite_err)?;
1342            let rows: Vec<(i64, String)> = sel_obs
1343                .query_map(params![src_id], |row| {
1344                    Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?))
1345                })
1346                .map_err(sqlite_err)?
1347                .filter_map(|r| r.ok())
1348                .collect();
1349            for (oid, _body) in &rows {
1350                max_idx += 1;
1351                upd_obs
1352                    .execute(params![tgt_id, max_idx, oid])
1353                    .map_err(sqlite_err)?;
1354                obs_count += 1;
1355            }
1356        }
1357
1358        // Move relations from source to target.
1359        conn.execute(
1360            "UPDATE OR IGNORE relation SET from_id = ?1 WHERE from_id = ?2",
1361            params![tgt_id, src_id],
1362        )
1363        .map_err(sqlite_err)?;
1364        conn.execute(
1365            "UPDATE OR IGNORE relation SET to_id = ?1 WHERE to_id = ?2",
1366            params![tgt_id, src_id],
1367        )
1368        .map_err(sqlite_err)?;
1369        // Delete orphaned relations that were updated by the above (the "OR IGNORE"
1370        // keeps the first, but we still have the original row with the old id? No —
1371        // UPDATE OR IGNORE won't remove. So we must delete any that still reference src_id.)
1372        conn.execute("DELETE FROM relation WHERE from_id = ?1", params![src_id])
1373            .map_err(sqlite_err)?;
1374        conn.execute("DELETE FROM relation WHERE to_id = ?1", params![src_id])
1375            .map_err(sqlite_err)?;
1376
1377        // Update degrees on target.
1378        let out_add: i64 = conn
1379            .query_row(
1380                "SELECT COUNT(*) FROM relation WHERE from_id = ?1",
1381                params![tgt_id],
1382                |row| row.get(0),
1383            )
1384            .map_err(sqlite_err)?;
1385        let in_add: i64 = conn
1386            .query_row(
1387                "SELECT COUNT(*) FROM relation WHERE to_id = ?1",
1388                params![tgt_id],
1389                |row| row.get(0),
1390            )
1391            .map_err(sqlite_err)?;
1392        conn.execute(
1393            "UPDATE entity SET out_deg = ?1, in_deg = ?2, obs_count = obs_count + ?3, updated_us = ?4 WHERE id = ?5",
1394            params![out_add, in_add, obs_count, now_us(), tgt_id],
1395        )
1396        .map_err(sqlite_err)?;
1397
1398        // Delete source entity.
1399        conn.execute(
1400            "INSERT INTO name_fts(name_fts, rowid, name) VALUES ('delete', ?1, '')",
1401            params![src_id],
1402        )
1403        .map_err(sqlite_err)?;
1404        conn.execute("DELETE FROM entity WHERE id = ?1", params![src_id])
1405            .map_err(sqlite_err)?;
1406
1407        inc_graph_stat(&conn, "entities", -1)?;
1408        self.meta_remove(source);
1409
1410        // Reload target into cache.
1411        if let Ok(meta) = conn.query_row(
1412            "SELECT id, type_id, obs_count, out_deg, in_deg FROM entity WHERE id = ?1",
1413            params![tgt_id],
1414            |row| {
1415                Ok(EntityMeta {
1416                    id: row.get(0)?,
1417                    type_id: row.get(1)?,
1418                    obs_count: row.get(2)?,
1419                    out_deg: row.get(3)?,
1420                    in_deg: row.get(4)?,
1421                })
1422            },
1423        ) {
1424            self.meta_set(target, meta);
1425        }
1426
1427        let (name, etype): (String, String) = conn
1428            .query_row(
1429                "SELECT e.name, t.name FROM entity e JOIN type_dict t ON t.id = e.type_id WHERE e.id = ?1",
1430                params![tgt_id],
1431                |row| Ok((row.get(0)?, row.get(1)?)),
1432            )
1433            .map_err(sqlite_err)?;
1434        let observations = load_observations_opt(&conn, tgt_id);
1435
1436        Ok(Entity {
1437            name,
1438            entity_type: etype,
1439            observations,
1440        })
1441    }
1442
1443    pub fn search_nodes_filtered(
1444        &self,
1445        query: &str,
1446        filter_type: Option<&str>,
1447        offset: usize,
1448        limit: usize,
1449    ) -> Vec<Entity> {
1450        if query.is_empty() {
1451            return Vec::new();
1452        }
1453        let conn = self.readers.get();
1454
1455        // Single pass: collect IDs from name_fts then obs_fts, deduping with a set.
1456        let mut entity_ids: Vec<i64> = Vec::new();
1457        let mut seen: HashSet<i64> = HashSet::new();
1458
1459        if let Ok(mut stmt) = conn.prepare(
1460            "SELECT rowid FROM name_fts WHERE name_fts MATCH ?1 ORDER BY rank LIMIT ?2",
1461        ) {
1462            let limit_i64 = (limit + offset) as i64;
1463            if let Ok(rows) = stmt.query_map(params![query, limit_i64], |row| {
1464                row.get::<_, i64>(0)
1465            }) {
1466                for row in rows.flatten() {
1467                    if seen.insert(row) {
1468                        entity_ids.push(row);
1469                    }
1470                }
1471            }
1472        }
1473
1474        if let Ok(mut stmt) = conn.prepare(
1475            "SELECT entity_id FROM obs_fts JOIN observation ON obs_fts.rowid = observation.id
1476             WHERE obs_fts MATCH ?1
1477             GROUP BY entity_id
1478             LIMIT ?2",
1479        ) {
1480            let limit_i64 = (limit + offset) as i64;
1481            if let Ok(rows) = stmt.query_map(params![query, limit_i64], |row| {
1482                row.get::<_, i64>(0)
1483            }) {
1484                for row in rows.flatten() {
1485                    if seen.insert(row) {
1486                        entity_ids.push(row);
1487                    }
1488                }
1489            }
1490        }
1491
1492        // Apply filter_type, offset, limit.
1493        let mut results = Vec::new();
1494        let mut count: usize = 0;
1495        for eid in entity_ids {
1496            if let Ok(entity) = entity_by_id(&conn, eid) {
1497                if let Some(ft) = filter_type
1498                    && !ft.is_empty() && entity.entity_type != ft {
1499                        continue;
1500                    }
1501                if count < offset {
1502                    count += 1;
1503                    continue;
1504                }
1505                if results.len() >= limit {
1506                    break;
1507                }
1508                results.push(entity);
1509                count += 1;
1510            }
1511        }
1512
1513        results
1514    }
1515
1516    pub fn read_graph_filtered(
1517        &self,
1518        filter_type: Option<&str>,
1519        offset: usize,
1520        limit: usize,
1521    ) -> Result<String> {
1522        let conn = self.readers.get();
1523
1524        let limit_sql: i64 = if limit == usize::MAX {
1525            -1
1526        } else {
1527            limit.min(i64::MAX as usize) as i64
1528        };
1529        let offset_sql: i64 = offset as i64;
1530
1531        // Resolve the requested page of entity ids first. Relations are then
1532        // scoped to edges whose *both* endpoints fall inside this page, which
1533        // keeps the response self-consistent (no dangling references to
1534        // entities that were paged out) and bounds the relation payload by the
1535        // page size instead of dumping every relation in the graph.
1536        let filter = filter_type.filter(|ft| !ft.is_empty());
1537        let ids: Vec<i64> = if let Some(ft) = filter {
1538            let mut stmt = conn
1539                .prepare_cached(
1540                    "SELECT e.id FROM entity e
1541                     WHERE e.type_id = (SELECT id FROM type_dict WHERE kind = 0 AND name = ?1)
1542                       AND e.flags = 0
1543                     ORDER BY e.id LIMIT ?2 OFFSET ?3",
1544                )
1545                .map_err(sqlite_err)?;
1546            stmt.query_map(params![ft, limit_sql, offset_sql], |r| r.get::<_, i64>(0))
1547                .map_err(sqlite_err)?
1548                .filter_map(|r| r.ok())
1549                .collect()
1550        } else {
1551            let mut stmt = conn
1552                .prepare_cached(
1553                    "SELECT e.id FROM entity e WHERE e.flags = 0
1554                     ORDER BY e.id LIMIT ?1 OFFSET ?2",
1555                )
1556                .map_err(sqlite_err)?;
1557            stmt.query_map(params![limit_sql, offset_sql], |r| r.get::<_, i64>(0))
1558                .map_err(sqlite_err)?
1559                .filter_map(|r| r.ok())
1560                .collect()
1561        };
1562
1563        if ids.is_empty() {
1564            return Ok(r#"{"entities":[],"relations":[]}"#.to_string());
1565        }
1566
1567        let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1568
1569        let entities_json: String = {
1570            let sql = format!(
1571                "SELECT COALESCE(json_group_array(json_object(
1572                    'name', e.name,
1573                    'entityType', t.name,
1574                    'observations', COALESCE((
1575                        SELECT json_group_array(o.body ORDER BY o.idx)
1576                        FROM observation o WHERE o.entity_id = e.id
1577                    ), json('[]'))
1578                ) ORDER BY e.id), json('[]'))
1579                FROM entity e
1580                JOIN type_dict t ON t.id = e.type_id
1581                WHERE e.id IN ({placeholders}) AND e.flags = 0"
1582            );
1583            conn.query_row(&sql, rusqlite::params_from_iter(&ids), |row| {
1584                row.get::<_, String>(0)
1585            })
1586            .map_err(sqlite_err)?
1587        };
1588
1589        let relations_json: String = {
1590            let sql = format!(
1591                "SELECT COALESCE(json_group_array(json_object(
1592                    'from', e1.name,
1593                    'to', e2.name,
1594                    'relationType', t.name
1595                )), json('[]'))
1596                FROM relation r
1597                JOIN entity e1 ON e1.id = r.from_id
1598                JOIN entity e2 ON e2.id = r.to_id
1599                JOIN type_dict t ON t.id = r.type_id
1600                WHERE r.from_id IN ({placeholders}) AND r.to_id IN ({placeholders})
1601                  AND e1.flags = 0 AND e2.flags = 0"
1602            );
1603            let all_params: Vec<&dyn ToSql> = ids
1604                .iter()
1605                .map(|id| id as &dyn ToSql)
1606                .chain(ids.iter().map(|id| id as &dyn ToSql))
1607                .collect();
1608            conn.query_row(&sql, all_params.as_slice(), |row| row.get::<_, String>(0))
1609                .map_err(sqlite_err)?
1610        };
1611
1612        let mut out = String::with_capacity(32 + entities_json.len() + relations_json.len());
1613        out.push_str("{\"entities\":");
1614        out.push_str(&entities_json);
1615        out.push_str(",\"relations\":");
1616        out.push_str(&relations_json);
1617        out.push('}');
1618        Ok(out)
1619    }
1620
1621    pub fn open_nodes(&self, names: &[String]) -> String {
1622        let conn = self.readers.get();
1623        let mut entity_ids: Vec<i64> = Vec::new();
1624
1625        for name in names {
1626            let h = name_hash(name);
1627            if let Ok(Some(id)) = conn
1628                .query_row(
1629                    "SELECT id FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
1630                    params![h, name],
1631                    |row| row.get::<_, i64>(0),
1632                )
1633                .map(Some)
1634                .or_else(|e| if is_not_found(&e) { Ok(None) } else { Err(sqlite_err(e)) })
1635            {
1636                entity_ids.push(id);
1637            }
1638        }
1639
1640        if entity_ids.is_empty() {
1641            return r#"{"entities":[],"relations":[]}"#.to_string();
1642        }
1643
1644        let placeholders: Vec<String> = entity_ids.iter().map(|_| "?".to_string()).collect();
1645        let ids_str = placeholders.join(",");
1646
1647        let entities_json: String = {
1648            let sql = format!(
1649                "SELECT COALESCE(json_group_array(json_object(
1650                    'name', e.name,
1651                    'entityType', t.name,
1652                    'observations', COALESCE((
1653                        SELECT json_group_array(o.body ORDER BY o.idx)
1654                        FROM observation o WHERE o.entity_id = e.id
1655                    ), json('[]'))
1656                ) ORDER BY e.id), json('[]'))
1657                FROM entity e
1658                JOIN type_dict t ON t.id = e.type_id
1659                WHERE e.id IN ({ids_str}) AND e.flags = 0"
1660            );
1661            conn.query_row(&sql, rusqlite::params_from_iter(&entity_ids), |row| {
1662                row.get::<_, String>(0)
1663            })
1664            .unwrap_or_else(|_| "[]".to_string())
1665        };
1666
1667        let relations_json: String = {
1668            let sql = format!(
1669                "SELECT COALESCE(json_group_array(json_object(
1670                    'from', e1.name,
1671                    'to', e2.name,
1672                    'relationType', t.name
1673                )), json('[]'))
1674                FROM relation r
1675                JOIN entity e1 ON e1.id = r.from_id
1676                JOIN entity e2 ON e2.id = r.to_id
1677                JOIN type_dict t ON t.id = r.type_id
1678                WHERE (r.from_id IN ({ids_str}) OR r.to_id IN ({ids_str}))
1679                  AND e1.flags = 0 AND e2.flags = 0"
1680            );
1681            let all_params: Vec<&dyn rusqlite::types::ToSql> = entity_ids
1682                .iter()
1683                .map(|id| id as &dyn rusqlite::types::ToSql)
1684                .chain(entity_ids.iter().map(|id| id as &dyn rusqlite::types::ToSql))
1685                .collect();
1686            let mut stmt = conn.prepare(&sql).unwrap();
1687            stmt.query_row(all_params.as_slice(), |row| row.get::<_, String>(0))
1688                .unwrap_or_else(|_| "[]".to_string())
1689        };
1690
1691        let mut out = String::with_capacity(32 + entities_json.len() + relations_json.len());
1692        out.push_str("{\"entities\":");
1693        out.push_str(&entities_json);
1694        out.push_str(",\"relations\":");
1695        out.push_str(&relations_json);
1696        out.push('}');
1697        out
1698    }
1699
1700    pub fn entities_exist(&self, names: &[String]) -> Result<Vec<bool>> {
1701        let conn = self.readers.get();
1702        let mut results = Vec::with_capacity(names.len());
1703        for name in names {
1704            let h = name_hash(name);
1705            let exists: bool = conn
1706                .query_row(
1707                    "SELECT 1 FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
1708                    params![h, name],
1709                    |_| Ok(()),
1710                )
1711                .is_ok();
1712            results.push(exists);
1713        }
1714        Ok(results)
1715    }
1716
1717    pub fn degree(&self, name: &str, direction: Direction) -> Result<usize> {
1718        let conn = self.readers.get();
1719        let (_, _, out_d, in_d) = match self.get_entity_id(&conn, name)? {
1720            Some(v) => v,
1721            None => {
1722                return Err(MCSError::InvalidParams(format!(
1723                    "Entity '{name}' not found"
1724                )))
1725            }
1726        };
1727        Ok(match direction {
1728            Direction::Outgoing => out_d as usize,
1729            Direction::Incoming => in_d as usize,
1730            Direction::Both => (out_d + in_d) as usize,
1731        })
1732    }
1733
1734    pub fn get_entity_count(&self) -> Result<usize> {
1735        let conn = self.readers.get();
1736        read_graph_stat(&conn, "entities")
1737            .map(|v| v as usize)
1738            .map_err(|_| MCSError::MemoryError("Failed to read entity count".into()))
1739    }
1740
1741    pub fn get_relation_count(&self) -> Result<usize> {
1742        let conn = self.readers.get();
1743        read_graph_stat(&conn, "relations")
1744            .map(|v| v as usize)
1745            .map_err(|_| MCSError::MemoryError("Failed to read relation count".into()))
1746    }
1747
1748    pub fn search_relations(
1749        &self,
1750        from: Option<&str>,
1751        to: Option<&str>,
1752        rtype: Option<&str>,
1753        limit: Option<usize>,
1754    ) -> Vec<Relation> {
1755        let conn = self.readers.get();
1756        let mut results = Vec::new();
1757
1758        // A filter that is supplied but resolves to nothing uses the sentinel
1759        // id -1 (which matches no row), so the query returns empty rather than
1760        // silently dropping the filter and matching every relation. The lookups
1761        // are read-only — `get_type_id` would *insert* a phantom type, which is
1762        // both wrong and impossible on a `query_only` reader connection.
1763        let from_id = from
1764            .filter(|f| !f.is_empty())
1765            .map(|f| entity_name_lookup(&conn, f).ok().flatten().unwrap_or(-1));
1766        let to_id = to
1767            .filter(|t| !t.is_empty())
1768            .map(|t| entity_name_lookup(&conn, t).ok().flatten().unwrap_or(-1));
1769        let type_id = rtype
1770            .filter(|rt| !rt.is_empty())
1771            .map(|rt| lookup_type_id(&conn, rt, 1).unwrap_or(-1));
1772
1773        match (from_id, to_id, type_id) {
1774            (Some(fid), Some(tid), Some(tpid)) => {
1775                if let Ok(mut stmt) = conn.prepare_cached(
1776                    "SELECT e1.name, e2.name, t.name
1777                     FROM relation r
1778                     JOIN entity e1 ON e1.id = r.from_id
1779                     JOIN entity e2 ON e2.id = r.to_id
1780                     JOIN type_dict t ON t.id = r.type_id
1781                     WHERE r.from_id = ?1 AND r.to_id = ?2 AND r.type_id = ?3
1782                       AND e1.flags = 0 AND e2.flags = 0
1783                     ORDER BY r.from_id, r.to_id"
1784                )
1785                    && let Ok(rows) = stmt.query_map(params![fid, tid, tpid], |row| {
1786                        Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1787                    }) {
1788                        for row in rows.flatten() { results.push(row); }
1789                    }
1790            }
1791            (Some(fid), Some(tid), None) => {
1792                if let Ok(mut stmt) = conn.prepare_cached(
1793                    "SELECT e1.name, e2.name, t.name
1794                     FROM relation r
1795                     JOIN entity e1 ON e1.id = r.from_id
1796                     JOIN entity e2 ON e2.id = r.to_id
1797                     JOIN type_dict t ON t.id = r.type_id
1798                     WHERE r.from_id = ?1 AND r.to_id = ?2
1799                       AND e1.flags = 0 AND e2.flags = 0
1800                     ORDER BY r.from_id, r.to_id"
1801                )
1802                    && let Ok(rows) = stmt.query_map(params![fid, tid], |row| {
1803                        Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1804                    }) {
1805                        for row in rows.flatten() { results.push(row); }
1806                    }
1807            }
1808            (Some(fid), None, Some(tpid)) => {
1809                if let Ok(mut stmt) = conn.prepare_cached(
1810                    "SELECT e1.name, e2.name, t.name
1811                     FROM relation r
1812                     JOIN entity e1 ON e1.id = r.from_id
1813                     JOIN entity e2 ON e2.id = r.to_id
1814                     JOIN type_dict t ON t.id = r.type_id
1815                     WHERE r.from_id = ?1 AND r.type_id = ?2
1816                       AND e1.flags = 0 AND e2.flags = 0
1817                     ORDER BY r.from_id, r.to_id"
1818                )
1819                    && let Ok(rows) = stmt.query_map(params![fid, tpid], |row| {
1820                        Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1821                    }) {
1822                        for row in rows.flatten() { results.push(row); }
1823                    }
1824            }
1825            (None, Some(tid), Some(tpid)) => {
1826                if let Ok(mut stmt) = conn.prepare_cached(
1827                    "SELECT e1.name, e2.name, t.name
1828                     FROM relation r
1829                     JOIN entity e1 ON e1.id = r.from_id
1830                     JOIN entity e2 ON e2.id = r.to_id
1831                     JOIN type_dict t ON t.id = r.type_id
1832                     WHERE r.to_id = ?1 AND r.type_id = ?2
1833                       AND e1.flags = 0 AND e2.flags = 0
1834                     ORDER BY r.from_id, r.to_id"
1835                )
1836                    && let Ok(rows) = stmt.query_map(params![tid, tpid], |row| {
1837                        Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1838                    }) {
1839                        for row in rows.flatten() { results.push(row); }
1840                    }
1841            }
1842            (Some(fid), None, None) => {
1843                if let Ok(mut stmt) = conn.prepare_cached(
1844                    "SELECT e1.name, e2.name, t.name
1845                     FROM relation r
1846                     JOIN entity e1 ON e1.id = r.from_id
1847                     JOIN entity e2 ON e2.id = r.to_id
1848                     JOIN type_dict t ON t.id = r.type_id
1849                     WHERE r.from_id = ?1
1850                       AND e1.flags = 0 AND e2.flags = 0
1851                     ORDER BY r.from_id, r.to_id"
1852                )
1853                    && let Ok(rows) = stmt.query_map(params![fid], |row| {
1854                        Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1855                    }) {
1856                        for row in rows.flatten() { results.push(row); }
1857                    }
1858            }
1859            (None, Some(tid), None) => {
1860                if let Ok(mut stmt) = conn.prepare_cached(
1861                    "SELECT e1.name, e2.name, t.name
1862                     FROM relation r
1863                     JOIN entity e1 ON e1.id = r.from_id
1864                     JOIN entity e2 ON e2.id = r.to_id
1865                     JOIN type_dict t ON t.id = r.type_id
1866                     WHERE r.to_id = ?1
1867                       AND e1.flags = 0 AND e2.flags = 0
1868                     ORDER BY r.from_id, r.to_id"
1869                )
1870                    && let Ok(rows) = stmt.query_map(params![tid], |row| {
1871                        Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1872                    }) {
1873                        for row in rows.flatten() { results.push(row); }
1874                    }
1875            }
1876            (None, None, Some(tpid)) => {
1877                if let Ok(mut stmt) = conn.prepare_cached(
1878                    "SELECT e1.name, e2.name, t.name
1879                     FROM relation r
1880                     JOIN entity e1 ON e1.id = r.from_id
1881                     JOIN entity e2 ON e2.id = r.to_id
1882                     JOIN type_dict t ON t.id = r.type_id
1883                     WHERE r.type_id = ?1
1884                       AND e1.flags = 0 AND e2.flags = 0
1885                     ORDER BY r.from_id, r.to_id"
1886                )
1887                    && let Ok(rows) = stmt.query_map(params![tpid], |row| {
1888                        Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1889                    }) {
1890                        for row in rows.flatten() { results.push(row); }
1891                    }
1892            }
1893            (None, None, None) => {
1894                if let Ok(mut stmt) = conn.prepare_cached(
1895                    "SELECT e1.name, e2.name, t.name
1896                     FROM relation r
1897                     JOIN entity e1 ON e1.id = r.from_id
1898                     JOIN entity e2 ON e2.id = r.to_id
1899                     JOIN type_dict t ON t.id = r.type_id
1900                     WHERE e1.flags = 0 AND e2.flags = 0
1901                     ORDER BY r.from_id, r.to_id"
1902                )
1903                    && let Ok(rows) = stmt.query_map([], |row| {
1904                        Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1905                    }) {
1906                        for row in rows.flatten() { results.push(row); }
1907                    }
1908            }
1909        }
1910        if let Some(lim) = limit {
1911            results.truncate(lim);
1912        }
1913        results
1914    }
1915
1916    pub fn find_path(&self, from: &str, to: &str) -> Result<Option<Vec<String>>> {
1917        let conn = self.readers.get();
1918        let (from_id, _, _, _) = match self.get_entity_id(&conn, from)? {
1919            Some(v) => v,
1920            None => {
1921                return Err(MCSError::InvalidParams(format!(
1922                    "Source entity '{from}' not found"
1923                )))
1924            }
1925        };
1926        let (to_id, _, _, _) = match self.get_entity_id(&conn, to)? {
1927            Some(v) => v,
1928            None => {
1929                return Err(MCSError::InvalidParams(format!(
1930                    "Target entity '{to}' not found"
1931                )))
1932            }
1933        };
1934
1935        if from_id == to_id {
1936            return Ok(Some(vec![from.to_string()]));
1937        }
1938
1939        // BFS with adjacency from relation table.
1940        let mut visited = HashSet::new();
1941        let mut parent: FxHashMap<i64, i64> = FxHashMap::default();
1942        let mut queue = VecDeque::new();
1943        visited.insert(from_id);
1944        queue.push_back(from_id);
1945
1946        while let Some(cur) = queue.pop_front() {
1947            if cur == to_id {
1948                break;
1949            }
1950            // Fetch out-neighbors.
1951            if let Ok(mut stmt) =
1952                conn.prepare_cached("SELECT to_id FROM relation WHERE from_id = ?1")
1953                && let Ok(rows) = stmt.query_map(params![cur], |row| row.get::<_, i64>(0)) {
1954                    for row in rows.flatten() {
1955                        if visited.insert(row) {
1956                            parent.insert(row, cur);
1957                            queue.push_back(row);
1958                        }
1959                    }
1960                }
1961            // Also check in-neighbors (undirected traversal).
1962            if let Ok(mut stmt) =
1963                conn.prepare_cached("SELECT from_id FROM relation WHERE to_id = ?1")
1964                && let Ok(rows) = stmt.query_map(params![cur], |row| row.get::<_, i64>(0)) {
1965                    for row in rows.flatten() {
1966                        if visited.insert(row) {
1967                            parent.insert(row, cur);
1968                            queue.push_back(row);
1969                        }
1970                    }
1971                }
1972        }
1973
1974        if !parent.contains_key(&to_id) && to_id != from_id {
1975            return Ok(None);
1976        }
1977
1978        let mut path = Vec::new();
1979        let mut cur = to_id;
1980        path.push(cur);
1981        while let Some(&p) = parent.get(&cur) {
1982            path.push(p);
1983            cur = p;
1984            if cur == from_id {
1985                break;
1986            }
1987        }
1988        path.reverse();
1989
1990        let placeholders: Vec<String> = path.iter().map(|_| "?".to_string()).collect();
1991        let sql = format!(
1992            "SELECT id, name FROM entity WHERE id IN ({})",
1993            placeholders.join(",")
1994        );
1995        let name_map: FxHashMap<i64, String> = if let Ok(mut stmt) = conn.prepare(&sql)
1996            && let Ok(rows) = stmt.query_map(
1997                rusqlite::params_from_iter(&path),
1998                |row| Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?)),
1999            ) {
2000            rows.flatten().collect()
2001        } else {
2002            FxHashMap::default()
2003        };
2004
2005        let name_path: Vec<String> = path.iter().filter_map(|id| name_map.get(id).cloned()).collect();
2006
2007        Ok(Some(name_path))
2008    }
2009
2010    pub fn compact(&self) -> Result<()> {
2011        let conn = self.writer.lock();
2012        conn.execute_batch("PRAGMA incremental_vacuum;").map_err(sqlite_err)?;
2013        Ok(())
2014    }
2015
2016    pub fn neighbors(
2017        &self,
2018        name: &str,
2019        direction: Direction,
2020        rtype: Option<&str>,
2021        depth: u32,
2022    ) -> Result<String> {
2023        self._traverse(name, direction, rtype, depth, true)
2024    }
2025
2026    pub fn extract_subgraph(
2027        &self,
2028        names: &[String],
2029        depth: u32,
2030    ) -> Result<String> {
2031        if names.is_empty() {
2032            return Ok(r#"{"entities":[],"relations":[]}"#.to_string());
2033        }
2034
2035        let conn = self.readers.get();
2036        let mut all_entity_ids: HashSet<i64> = HashSet::new();
2037        let mut frontier: HashSet<i64> = HashSet::new();
2038        let mut all_rel_pairs: HashSet<(i64, i64, i64)> = HashSet::new();
2039
2040        // Resolve seed entities.
2041        for name in names {
2042            let h = name_hash(name);
2043            if let Ok(Some(id)) = conn
2044                .query_row(
2045                    "SELECT id FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
2046                    params![h, name],
2047                    |row| row.get::<_, i64>(0),
2048                )
2049                .map(Some)
2050                .or_else(|e| if is_not_found(&e) { Ok(None) } else { Err(sqlite_err(e)) })
2051            {
2052                all_entity_ids.insert(id);
2053                frontier.insert(id);
2054            }
2055        }
2056
2057        let mut current_depth = 0u32;
2058        while current_depth < depth && !frontier.is_empty() {
2059            let mut next_frontier: HashSet<i64> = HashSet::new();
2060
2061            // Collect relations for current frontier — batched IN queries
2062            // instead of one query per frontier entity.
2063            const CHUNK: usize = 500;
2064            let frontier_ids: Vec<i64> = frontier.iter().copied().collect();
2065            for chunk in frontier_ids.chunks(CHUNK) {
2066                let placeholders: Vec<String> = chunk.iter().map(|_| "?".to_string()).collect();
2067                let in_clause = placeholders.join(",");
2068
2069                // Forward: from_id IN chunk.
2070                if let Ok(mut stmt) = conn.prepare(
2071                    &format!(
2072                        "SELECT from_id, to_id, type_id FROM relation WHERE from_id IN ({in_clause})",
2073                    )
2074                )
2075                    && let Ok(rows) = stmt.query_map(
2076                        rusqlite::params_from_iter(chunk),
2077                        |row| Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?, row.get::<_, i64>(2)?)),
2078                    )
2079                {
2080                    for row in rows.flatten() {
2081                        let (from_id, to_id, type_id) = row;
2082                        all_rel_pairs.insert((from_id, to_id, type_id));
2083                        if all_entity_ids.insert(to_id) {
2084                            next_frontier.insert(to_id);
2085                        }
2086                    }
2087                }
2088
2089                // Backward: to_id IN chunk.
2090                if let Ok(mut stmt) = conn.prepare(
2091                    &format!(
2092                        "SELECT from_id, to_id, type_id FROM relation WHERE to_id IN ({in_clause})",
2093                    )
2094                )
2095                    && let Ok(rows) = stmt.query_map(
2096                        rusqlite::params_from_iter(chunk),
2097                        |row| Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?, row.get::<_, i64>(2)?)),
2098                    )
2099                {
2100                    for row in rows.flatten() {
2101                        let (from_id, to_id, type_id) = row;
2102                        all_rel_pairs.insert((from_id, to_id, type_id));
2103                        if all_entity_ids.insert(from_id) {
2104                            next_frontier.insert(from_id);
2105                        }
2106                    }
2107                }
2108            }
2109            if all_entity_ids.len() > MAX_TRAVERSAL_ENTITIES
2110                || all_rel_pairs.len() > MAX_TRAVERSAL_RELS
2111            {
2112                break;
2113            }
2114            frontier = next_frontier;
2115            current_depth += 1;
2116        }
2117
2118        let entities_json: String = {
2119            if all_entity_ids.is_empty() {
2120                "[]".to_string()
2121            } else {
2122                let ids: Vec<i64> = all_entity_ids.iter().copied().collect();
2123                let placeholders: Vec<String> = ids.iter().map(|_| "?".to_string()).collect();
2124                let sql = format!(
2125                    "SELECT COALESCE(json_group_array(json_object(
2126                        'name', e.name,
2127                        'entityType', t.name,
2128                        'observations', COALESCE((
2129                            SELECT json_group_array(o.body ORDER BY o.idx)
2130                            FROM observation o WHERE o.entity_id = e.id
2131                        ), json('[]'))
2132                    ) ORDER BY e.id), json('[]'))
2133                    FROM entity e
2134                    JOIN type_dict t ON t.id = e.type_id
2135                    WHERE e.id IN ({}) AND e.flags = 0",
2136                    placeholders.join(",")
2137                );
2138                conn.query_row(&sql, rusqlite::params_from_iter(&ids), |row| {
2139                    row.get::<_, String>(0)
2140                })
2141                .unwrap_or_else(|_| "[]".to_string())
2142            }
2143        };
2144
2145        let relations_json: String = {
2146            if all_rel_pairs.is_empty() {
2147                "[]".to_string()
2148            } else {
2149                let vals: Vec<String> = all_rel_pairs.iter().map(|_| "(?, ?, ?)".to_string()).collect();
2150                let sql = format!(
2151                    "WITH r(from_id, to_id, type_id) AS (VALUES {})
2152                    SELECT COALESCE(json_group_array(json_object(
2153                        'from', e1.name,
2154                        'to', e2.name,
2155                        'relationType', t.name
2156                    )), json('[]'))
2157                    FROM r
2158                    JOIN entity e1 ON e1.id = r.from_id
2159                    JOIN entity e2 ON e2.id = r.to_id
2160                    JOIN type_dict t ON t.id = r.type_id
2161                    WHERE e1.flags = 0 AND e2.flags = 0",
2162                    vals.join(", ")
2163                );
2164                let params: Vec<&dyn ToSql> = all_rel_pairs.iter()
2165                    .flat_map(|(f, t, tp)| {
2166                        vec![f as &dyn ToSql, t as &dyn ToSql, tp as &dyn ToSql]
2167                    })
2168                    .collect();
2169                let mut stmt = conn.prepare(&sql).map_err(sqlite_err)?;
2170                stmt.query_row(params.as_slice(), |row| row.get::<_, String>(0))
2171                    .unwrap_or_else(|_| "[]".to_string())
2172            }
2173        };
2174
2175        let mut out = String::with_capacity(32 + entities_json.len() + relations_json.len());
2176        out.push_str("{\"entities\":");
2177        out.push_str(&entities_json);
2178        out.push_str(",\"relations\":");
2179        out.push_str(&relations_json);
2180        out.push('}');
2181        Ok(out)
2182    }
2183
2184    pub fn describe_entity(&self, name: &str) -> Result<Entity> {
2185        self.get_entity(name)?
2186            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))
2187    }
2188
2189    pub fn entity_type_counts(&self) -> Vec<(String, usize)> {
2190        let conn = self.readers.get();
2191        select_all_types(&conn, 0).unwrap_or_default()
2192    }
2193
2194    pub fn relation_type_counts(&self) -> Vec<(String, usize)> {
2195        let conn = self.readers.get();
2196        select_all_types(&conn, 1).unwrap_or_default()
2197    }
2198
2199    pub fn batch_get_entities(&self, names: &[String]) -> Vec<Option<Entity>> {
2200        names
2201            .iter()
2202            .map(|n| self.get_entity(n).unwrap_or(None))
2203            .collect()
2204    }
2205
2206    pub fn find_all_paths(
2207        &self,
2208        from: &str,
2209        to: &str,
2210        max_depth: usize,
2211        max_paths: usize,
2212    ) -> Result<Vec<Vec<String>>> {
2213        let conn = self.readers.get();
2214        let (from_id, _, _, _) = match self.get_entity_id(&conn, from)? {
2215            Some(v) => v,
2216            None => {
2217                return Err(MCSError::InvalidParams(format!(
2218                    "Source entity '{from}' not found"
2219                )))
2220            }
2221        };
2222        let (to_id, _, _, _) = match self.get_entity_id(&conn, to)? {
2223            Some(v) => v,
2224            None => {
2225                return Err(MCSError::InvalidParams(format!(
2226                    "Target entity '{to}' not found"
2227                )))
2228            }
2229        };
2230
2231        if from_id == to_id {
2232            return Ok(vec![vec![from.to_string()]]);
2233        }
2234
2235        // BFS enumerating all paths up to max_depth.
2236        let mut all_paths: Vec<Vec<i64>> = Vec::new();
2237        let mut queue: VecDeque<(i64, Vec<i64>)> = VecDeque::new();
2238        queue.push_back((from_id, vec![from_id]));
2239
2240        const MAX_QUEUE_SIZE: usize = 10_000_000;
2241
2242        while let Some((cur, path)) = queue.pop_front() {
2243            if all_paths.len() >= max_paths {
2244                break;
2245            }
2246            if path.len() > max_depth {
2247                continue;
2248            }
2249
2250            // Out-neighbors.
2251            if let Ok(mut stmt) =
2252                conn.prepare_cached("SELECT to_id FROM relation WHERE from_id = ?1")
2253                && let Ok(rows) = stmt.query_map(params![cur], |row| row.get::<_, i64>(0)) {
2254                    for next_id in rows.flatten() {
2255                        if next_id == to_id {
2256                            let mut full_path = path.clone();
2257                            full_path.push(next_id);
2258                            all_paths.push(full_path);
2259                            if all_paths.len() >= max_paths {
2260                                break;
2261                            }
2262                        } else if !path.contains(&next_id) && path.len() < max_depth {
2263                            if queue.len() >= MAX_QUEUE_SIZE {
2264                                return Err(MCSError::InvalidParams(
2265                                    "Path exploration queue exceeded limit (too many paths on highly connected graph)".to_string()
2266                                ));
2267                            }
2268                            let mut new_path = path.clone();
2269                            new_path.push(next_id);
2270                            queue.push_back((next_id, new_path));
2271                        }
2272                    }
2273                }
2274
2275            // In-neighbors (undirected).
2276            if let Ok(mut stmt) =
2277                conn.prepare_cached("SELECT from_id FROM relation WHERE to_id = ?1")
2278                && let Ok(rows) = stmt.query_map(params![cur], |row| row.get::<_, i64>(0)) {
2279                    for next_id in rows.flatten() {
2280                        if next_id == to_id {
2281                            let mut full_path = path.clone();
2282                            full_path.push(next_id);
2283                            all_paths.push(full_path);
2284                            if all_paths.len() >= max_paths {
2285                                break;
2286                            }
2287                        } else if !path.contains(&next_id) && path.len() < max_depth {
2288                            if queue.len() >= MAX_QUEUE_SIZE {
2289                                return Err(MCSError::InvalidParams(
2290                                    "Path exploration queue exceeded limit (too many paths on highly connected graph)".to_string()
2291                                ));
2292                            }
2293                            let mut new_path = path.clone();
2294                            new_path.push(next_id);
2295                            queue.push_back((next_id, new_path));
2296                        }
2297                    }
2298                }
2299        }
2300
2301        // Convert ids to names — one batch query instead of N lookups per path.
2302        let all_ids: HashSet<i64> = all_paths.iter().flat_map(|p| p.iter()).copied().collect();
2303        let id_list: Vec<i64> = all_ids.into_iter().collect();
2304        let name_map: FxHashMap<i64, String> = if id_list.is_empty() {
2305            FxHashMap::default()
2306        } else {
2307            let placeholders: Vec<String> = id_list.iter().map(|_| "?".to_string()).collect();
2308            let sql = format!(
2309                "SELECT id, name FROM entity WHERE id IN ({})",
2310                placeholders.join(",")
2311            );
2312            if let Ok(mut stmt) = conn.prepare(&sql)
2313                && let Ok(rows) = stmt.query_map(
2314                    rusqlite::params_from_iter(&id_list),
2315                    |row| Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?)),
2316                ) {
2317                rows.flatten().collect()
2318            } else {
2319                FxHashMap::default()
2320            }
2321        };
2322
2323        let mut named_paths: Vec<Vec<String>> = Vec::with_capacity(all_paths.len());
2324        for path_ids in all_paths {
2325            let named: Vec<String> = path_ids.iter().filter_map(|id| name_map.get(id).cloned()).collect();
2326            named_paths.push(named);
2327        }
2328
2329        Ok(named_paths)
2330    }
2331
2332    /// Export the whole graph as a JSON string. `max_rows` caps both the entity
2333    /// and relation arrays so a pathologically large graph cannot be coerced
2334    /// into an unbounded in-memory string (DoS guard); callers pass a generous
2335    /// constant. A negative value means "no limit".
2336    pub fn export(&self, _format: &str, max_rows: i64) -> Result<String> {
2337        let conn = self.readers.get();
2338        // Only JSON is supported; the format argument is accepted for forward
2339        // compatibility.
2340        conn.query_row(
2341            "SELECT json_object(
2342                'entities', COALESCE((
2343                    SELECT json_group_array(json_object(
2344                        'name', e.name,
2345                        'entityType', t.name,
2346                        'observations', COALESCE((
2347                            SELECT json_group_array(o.body ORDER BY o.idx)
2348                            FROM observation o WHERE o.entity_id = e.id
2349                        ), json('[]'))
2350                    ) ORDER BY e.id)
2351                    FROM (
2352                        SELECT id, name, type_id FROM entity
2353                        WHERE flags = 0 ORDER BY id LIMIT ?1
2354                    ) e
2355                    JOIN type_dict t ON t.id = e.type_id
2356                ), json('[]')),
2357                'relations', COALESCE((
2358                    SELECT json_group_array(json_object(
2359                        'from', e1.name,
2360                        'to', e2.name,
2361                        'relationType', t.name
2362                    ))
2363                    FROM (
2364                        SELECT from_id, to_id, type_id FROM relation LIMIT ?1
2365                    ) r
2366                    JOIN entity e1 ON e1.id = r.from_id
2367                    JOIN entity e2 ON e2.id = r.to_id
2368                    JOIN type_dict t ON t.id = r.type_id
2369                    WHERE e1.flags = 0 AND e2.flags = 0
2370                ), json('[]'))
2371            )",
2372            params![max_rows],
2373            |row| row.get::<_, String>(0),
2374        )
2375        .map_err(sqlite_err)
2376    }
2377
2378    pub fn wipe(&self) -> Result<()> {
2379        let conn = self.writer.lock();
2380        // `name_fts`/`obs_fts` are external-content FTS5 tables; the supported
2381        // way to empty them is the special `'delete-all'` command, not a bare
2382        // `DELETE FROM` (which is invalid for external-content tables and can
2383        // leave the index inconsistent). Run it after clearing the content rows.
2384        conn.execute_batch(
2385            "DELETE FROM observation;
2386             DELETE FROM relation;
2387             DELETE FROM entity;
2388             DELETE FROM type_dict;
2389             INSERT INTO name_fts(name_fts) VALUES('delete-all');
2390             INSERT INTO obs_fts(obs_fts) VALUES('delete-all');
2391             UPDATE graph_stat SET value = 0 WHERE key IN ('entities', 'relations', 'observations');
2392             UPDATE graph_stat SET value = 0 WHERE key IN ('entity_seq', 'obs_seq');",
2393        )
2394        .map_err(sqlite_err)?;
2395        self.seq_entity.store(0, Ordering::Relaxed);
2396        self.seq_obs.store(0, Ordering::Relaxed);
2397        self.cache.lock().clear();
2398        Ok(())
2399    }
2400
2401    /// Periodic database maintenance: WAL checkpoint, query planner analysis,
2402    /// and FTS index optimization. Call from a background timer.
2403    pub fn run_maintenance(&self) -> Result<()> {
2404        let conn = self.writer.lock();
2405
2406        conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);")
2407            .map_err(sqlite_err)?;
2408
2409        conn.execute_batch("PRAGMA optimize(0x10000);")
2410            .map_err(sqlite_err)?;
2411
2412        conn.execute_batch(
2413            "INSERT INTO name_fts(name_fts) VALUES('optimize');
2414             INSERT INTO obs_fts(obs_fts) VALUES('optimize');",
2415        )
2416        .map_err(sqlite_err)?;
2417
2418        Ok(())
2419    }
2420
2421    /// Run a non-blocking `wal_checkpoint(PASSIVE)` to fsync committed WAL frames
2422    /// without stalling readers or writers. Call from a short-interval timer to
2423    /// bound the durability window in `async` mode.
2424    pub fn checkpoint_passive(&self) -> Result<()> {
2425        let conn = self.writer.lock();
2426        conn.execute_batch("PRAGMA wal_checkpoint(PASSIVE);")
2427            .map_err(sqlite_err)?;
2428        Ok(())
2429    }
2430
2431    fn _traverse(
2432        &self,
2433        name: &str,
2434        direction: Direction,
2435        rtype: Option<&str>,
2436        depth: u32,
2437        // unused — we always include relations; the caller controls via depth
2438        _include_relations: bool,
2439    ) -> Result<String> {
2440        let conn = self.readers.get();
2441        let (start_id, _, _, _) = match self.get_entity_id(&conn, name)? {
2442            Some(v) => v,
2443            None => {
2444                return Err(MCSError::InvalidParams(format!(
2445                    "Entity '{name}' not found"
2446                )))
2447            }
2448        };
2449
2450        let mut all_ids: HashSet<i64> = HashSet::new();
2451        let mut all_rels: HashSet<(i64, i64, i64)> = HashSet::new();
2452        let mut frontier: HashSet<i64> = HashSet::new();
2453        all_ids.insert(start_id);
2454        frontier.insert(start_id);
2455
2456        // Read-only type resolution. A requested-but-missing type uses the
2457        // sentinel id -1 (matches no edge), so traversal yields just the start
2458        // entity instead of falling back to "no type filter" and walking every
2459        // edge. `get_type_id` is avoided here: it inserts and cannot run on the
2460        // `query_only` reader connection.
2461        let type_filter: Option<i64> = rtype
2462            .filter(|rt| !rt.is_empty())
2463            .map(|rt| lookup_type_id(&conn, rt, 1).unwrap_or(-1));
2464
2465        // Pre-compile all four possible queries outside the loop.
2466        let mut q_out_t = conn.prepare_cached(
2467            "SELECT to_id, type_id FROM relation WHERE from_id = ?1 AND type_id = ?2");
2468        let mut q_out   = conn.prepare_cached(
2469            "SELECT to_id, type_id FROM relation WHERE from_id = ?1");
2470        let mut q_in_t  = conn.prepare_cached(
2471            "SELECT from_id, type_id FROM relation WHERE to_id = ?1 AND type_id = ?2");
2472        let mut q_in    = conn.prepare_cached(
2473            "SELECT from_id, type_id FROM relation WHERE to_id = ?1");
2474
2475        let mut cur_depth = 0u32;
2476        while cur_depth < depth && !frontier.is_empty() {
2477            let mut next_frontier: HashSet<i64> = HashSet::new();
2478
2479            for &fid in &frontier {
2480                if direction == Direction::Outgoing || direction == Direction::Both {
2481                    if let Some(tid) = type_filter {
2482                        if let Ok(ref mut stmt) = q_out_t
2483                            && let Ok(rows) = stmt.query_map(params![fid, tid], |row| {
2484                                Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?))
2485                            }) {
2486                                for row in rows.flatten() {
2487                                    let (to_id, t_id) = row;
2488                                    all_rels.insert((fid, to_id, t_id));
2489                                    if all_ids.insert(to_id) { next_frontier.insert(to_id); }
2490                                }
2491                            }
2492                    } else if let Ok(ref mut stmt) = q_out
2493                        && let Ok(rows) = stmt.query_map(params![fid], |row| {
2494                            Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?))
2495                        }) {
2496                            for row in rows.flatten() {
2497                                let (to_id, t_id) = row;
2498                                all_rels.insert((fid, to_id, t_id));
2499                                if all_ids.insert(to_id) { next_frontier.insert(to_id); }
2500                            }
2501                        }
2502                }
2503
2504                if direction == Direction::Incoming || direction == Direction::Both {
2505                    if let Some(tid) = type_filter {
2506                        if let Ok(ref mut stmt) = q_in_t
2507                            && let Ok(rows) = stmt.query_map(params![fid, tid], |row| {
2508                                Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?))
2509                            }) {
2510                                for row in rows.flatten() {
2511                                    let (from_id, t_id) = row;
2512                                    all_rels.insert((from_id, fid, t_id));
2513                                    if all_ids.insert(from_id) { next_frontier.insert(from_id); }
2514                                }
2515                            }
2516                    } else if let Ok(ref mut stmt) = q_in
2517                        && let Ok(rows) = stmt.query_map(params![fid], |row| {
2518                            Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?))
2519                        }) {
2520                            for row in rows.flatten() {
2521                                let (from_id, t_id) = row;
2522                                all_rels.insert((from_id, fid, t_id));
2523                                if all_ids.insert(from_id) { next_frontier.insert(from_id); }
2524                            }
2525                        }
2526                }
2527            }
2528
2529            // DoS guard: stop traversal if we've collected too many entities
2530            // or relations. The response will be partial, which is preferable
2531            // to an OOM crash on densely connected graphs.
2532            if all_ids.len() > MAX_TRAVERSAL_ENTITIES || all_rels.len() > MAX_TRAVERSAL_RELS {
2533                break;
2534            }
2535
2536            frontier = next_frontier;
2537            cur_depth += 1;
2538        }
2539
2540        let entities_json: String = {
2541            if all_ids.is_empty() {
2542                "[]".to_string()
2543            } else {
2544                let ids: Vec<i64> = all_ids.iter().copied().collect();
2545                let placeholders: Vec<String> = ids.iter().map(|_| "?".to_string()).collect();
2546                let sql = format!(
2547                    "SELECT COALESCE(json_group_array(json_object(
2548                        'name', e.name,
2549                        'entityType', t.name,
2550                        'observations', COALESCE((
2551                            SELECT json_group_array(o.body ORDER BY o.idx)
2552                            FROM observation o WHERE o.entity_id = e.id
2553                        ), json('[]'))
2554                    ) ORDER BY e.id), json('[]'))
2555                    FROM entity e
2556                    JOIN type_dict t ON t.id = e.type_id
2557                    WHERE e.id IN ({}) AND e.flags = 0",
2558                    placeholders.join(",")
2559                );
2560                conn.query_row(&sql, rusqlite::params_from_iter(&ids), |row| {
2561                    row.get::<_, String>(0)
2562                })
2563                .unwrap_or_else(|_| "[]".to_string())
2564            }
2565        };
2566
2567        let relations_json: String = {
2568            if all_rels.is_empty() {
2569                "[]".to_string()
2570            } else {
2571                let vals: Vec<String> = all_rels.iter().map(|_| "(?, ?, ?)".to_string()).collect();
2572                let sql = format!(
2573                    "WITH r(from_id, to_id, type_id) AS (VALUES {})
2574                    SELECT COALESCE(json_group_array(json_object(
2575                        'from', e1.name,
2576                        'to', e2.name,
2577                        'relationType', t.name
2578                    )), json('[]'))
2579                    FROM r
2580                    JOIN entity e1 ON e1.id = r.from_id
2581                    JOIN entity e2 ON e2.id = r.to_id
2582                    JOIN type_dict t ON t.id = r.type_id
2583                    WHERE e1.flags = 0 AND e2.flags = 0",
2584                    vals.join(", ")
2585                );
2586                let params: Vec<&dyn ToSql> = all_rels.iter()
2587                    .flat_map(|(f, t, tp)| {
2588                        vec![f as &dyn ToSql, t as &dyn ToSql, tp as &dyn ToSql]
2589                    })
2590                    .collect();
2591                let mut stmt = conn.prepare(&sql).map_err(sqlite_err)?;
2592                stmt.query_row(params.as_slice(), |row| row.get::<_, String>(0))
2593                    .unwrap_or_else(|_| "[]".to_string())
2594            }
2595        };
2596
2597        let mut out = String::with_capacity(32 + entities_json.len() + relations_json.len());
2598        out.push_str("{\"entities\":");
2599        out.push_str(&entities_json);
2600        out.push_str(",\"relations\":");
2601        out.push_str(&relations_json);
2602        out.push('}');
2603        Ok(out)
2604    }
2605}
2606
2607// ── Tests ────────────────────────────────────────────────────────────────
2608
2609#[cfg(test)]
2610mod tests {
2611    use super::*;
2612    use serde_json::Value;
2613    use std::ops::Deref;
2614    use std::path::PathBuf;
2615
2616    struct TestKg(GraphHandle, PathBuf);
2617
2618    impl Deref for TestKg {
2619        type Target = GraphHandle;
2620        fn deref(&self) -> &GraphHandle {
2621            &self.0
2622        }
2623    }
2624
2625    impl Drop for TestKg {
2626        fn drop(&mut self) {
2627            cleanup_db(&self.1);
2628        }
2629    }
2630
2631    fn cleanup_db(path: &std::path::Path) {
2632        let _ = std::fs::remove_file(path);
2633        let _ = std::fs::remove_file(path.with_extension("db-wal"));
2634        let _ = std::fs::remove_file(path.with_extension("db-shm"));
2635    }
2636
2637    fn new_kg() -> TestKg {
2638        use std::sync::atomic::AtomicU64;
2639        use std::sync::atomic::Ordering;
2640        static COUNTER: AtomicU64 = AtomicU64::new(0);
2641        let n = COUNTER.fetch_add(1, Ordering::SeqCst);
2642        let dir = std::env::temp_dir();
2643        let path = dir.join(format!("kg_test_{}_{}.db", std::process::id(), n));
2644        cleanup_db(&path);
2645        let kg = GraphHandle::new(&path, Durability::Async, SqliteTuning::default(), NonZeroUsize::new(10000).unwrap(), 4).expect("create KG");
2646        TestKg(kg, path)
2647    }
2648
2649    #[test]
2650    fn test_create_and_get_entity() {
2651        let kg = new_kg();
2652        let entities = vec![Entity {
2653            name: "test".into(),
2654            entity_type: "person".into(),
2655            observations: vec!["obs1".into(), "obs2".into()],
2656        }];
2657        let created = kg.create_entities(&entities).unwrap();
2658        assert_eq!(created.len(), 1);
2659
2660        let got = kg.get_entity("test").unwrap().unwrap();
2661        assert_eq!(got.name, "test");
2662        assert_eq!(got.entity_type, "person");
2663        assert_eq!(got.observations, vec!["obs1", "obs2"]);
2664    }
2665
2666    #[test]
2667    fn test_get_nonexistent() {
2668        let kg = new_kg();
2669        assert!(kg.get_entity("nonexistent").unwrap().is_none());
2670    }
2671
2672    #[test]
2673    fn test_delete_entity() {
2674        let kg = new_kg();
2675        kg.create_entities(&[Entity {
2676            name: "del".into(),
2677            entity_type: "t".into(),
2678            observations: vec![],
2679        }])
2680        .unwrap();
2681        assert!(kg.get_entity("del").unwrap().is_some());
2682        kg.delete_entities(&["del".to_string()]).unwrap();
2683        assert!(kg.get_entity("del").unwrap().is_none());
2684    }
2685
2686    #[test]
2687    fn test_add_and_delete_observations() {
2688        let kg = new_kg();
2689        kg.create_entities(&[Entity {
2690            name: "obs_test".into(),
2691            entity_type: "t".into(),
2692            observations: vec!["a".into()],
2693        }])
2694        .unwrap();
2695
2696        let added = kg.add_observations("obs_test", &["b".into(), "c".into()]).unwrap();
2697        assert_eq!(added.len(), 2);
2698
2699        let ent = kg.get_entity("obs_test").unwrap().unwrap();
2700        assert!(ent.observations.contains(&"b".into()));
2701        assert!(ent.observations.contains(&"c".into()));
2702
2703        kg.delete_observations("obs_test", &["b".into()]).unwrap();
2704        let ent = kg.get_entity("obs_test").unwrap().unwrap();
2705        assert!(!ent.observations.contains(&"b".into()));
2706        assert!(ent.observations.contains(&"c".into()));
2707        assert!(ent.observations.contains(&"a".into()));
2708    }
2709
2710    #[test]
2711    fn test_create_relations() {
2712        let kg = new_kg();
2713        kg.create_entities(&[
2714            Entity {
2715                name: "A".into(),
2716                entity_type: "node".into(),
2717                observations: vec![],
2718            },
2719            Entity {
2720                name: "B".into(),
2721                entity_type: "node".into(),
2722                observations: vec![],
2723            },
2724        ])
2725        .unwrap();
2726
2727        let rels = kg
2728            .create_relations(&[Relation {
2729                from: "A".into(),
2730                to: "B".into(),
2731                relation_type: "edge".into(),
2732            }])
2733            .unwrap();
2734        assert_eq!(rels.len(), 1);
2735
2736        assert_eq!(kg.get_entity_count().unwrap(), 2);
2737        assert_eq!(kg.get_relation_count().unwrap(), 1);
2738    }
2739
2740    #[test]
2741    fn test_search_nodes() {
2742        let kg = new_kg();
2743        kg.create_entities(&[Entity {
2744            name: "Einstein".into(),
2745            entity_type: "scientist".into(),
2746            observations: vec!["physics".into(), "relativity".into()],
2747        }])
2748        .unwrap();
2749
2750        let results = kg.search_nodes_filtered("physics", None, 0, 10);
2751        assert_eq!(results.len(), 1);
2752        assert_eq!(results[0].name, "Einstein");
2753
2754        let results = kg.search_nodes_filtered("physics", Some("scientist"), 0, 10);
2755        assert_eq!(results.len(), 1);
2756
2757        let results = kg.search_nodes_filtered("physics", Some("nonexistent"), 0, 10);
2758        assert_eq!(results.len(), 0);
2759    }
2760
2761    #[test]
2762    fn test_find_path() {
2763        let kg = new_kg();
2764        kg.create_entities(&[
2765            Entity { name: "A".into(), entity_type: "n".into(), observations: vec![] },
2766            Entity { name: "B".into(), entity_type: "n".into(), observations: vec![] },
2767            Entity { name: "C".into(), entity_type: "n".into(), observations: vec![] },
2768        ]).unwrap();
2769
2770        kg.create_relations(&[
2771            Relation { from: "A".into(), to: "B".into(), relation_type: "e".into() },
2772            Relation { from: "B".into(), to: "C".into(), relation_type: "e".into() },
2773        ]).unwrap();
2774
2775        let path = kg.find_path("A", "C").unwrap().unwrap();
2776        assert_eq!(path, vec!["A", "B", "C"]);
2777    }
2778
2779    #[test]
2780    fn test_degree() {
2781        let kg = new_kg();
2782        kg.create_entities(&[
2783            Entity { name: "A".into(), entity_type: "n".into(), observations: vec![] },
2784            Entity { name: "B".into(), entity_type: "n".into(), observations: vec![] },
2785            Entity { name: "C".into(), entity_type: "n".into(), observations: vec![] },
2786        ]).unwrap();
2787
2788        kg.create_relations(&[
2789            Relation { from: "A".into(), to: "B".into(), relation_type: "e".into() },
2790            Relation { from: "A".into(), to: "C".into(), relation_type: "e".into() },
2791        ]).unwrap();
2792
2793        assert_eq!(kg.degree("A", Direction::Outgoing).unwrap(), 2);
2794        assert_eq!(kg.degree("A", Direction::Incoming).unwrap(), 0);
2795        assert_eq!(kg.degree("B", Direction::Incoming).unwrap(), 1);
2796    }
2797
2798    #[test]
2799    fn test_neighbors() {
2800        let kg = new_kg();
2801        kg.create_entities(&[
2802            Entity { name: "A".into(), entity_type: "n".into(), observations: vec![] },
2803            Entity { name: "B".into(), entity_type: "n".into(), observations: vec![] },
2804        ]).unwrap();
2805
2806        kg.create_relations(&[Relation {
2807            from: "A".into(), to: "B".into(), relation_type: "e".into(),
2808        }]).unwrap();
2809
2810        let result = kg.neighbors("A", Direction::Outgoing, None, 1).unwrap();
2811        let v: Value = serde_json::from_str(&result).unwrap();
2812        assert_eq!(v["entities"].as_array().unwrap().len(), 2);
2813        assert_eq!(v["relations"].as_array().unwrap().len(), 1);
2814    }
2815
2816    #[test]
2817    fn test_open_nodes() {
2818        let kg = new_kg();
2819        kg.create_entities(&[
2820            Entity { name: "X".into(), entity_type: "n".into(), observations: vec!["obs_x".into()] },
2821            Entity { name: "Y".into(), entity_type: "n".into(), observations: vec!["obs_y".into()] },
2822        ]).unwrap();
2823
2824        kg.create_relations(&[Relation {
2825            from: "X".into(), to: "Y".into(), relation_type: "e".into(),
2826        }]).unwrap();
2827
2828        let result = kg.open_nodes(&["X".into()]);
2829        let v: Value = serde_json::from_str(&result).unwrap();
2830        assert_eq!(v["entities"].as_array().unwrap().len(), 1);
2831        assert_eq!(v["relations"].as_array().unwrap().len(), 1);
2832    }
2833
2834    #[test]
2835    fn test_entities_exist() {
2836        let kg = new_kg();
2837        kg.create_entities(&[Entity {
2838            name: "exists".into(), entity_type: "t".into(), observations: vec![],
2839        }]).unwrap();
2840
2841        let res = kg.entities_exist(&["exists".into(), "missing".into()]).unwrap();
2842        assert_eq!(res, vec![true, false]);
2843    }
2844
2845    #[test]
2846    fn test_describe_entity() {
2847        let kg = new_kg();
2848        kg.create_entities(&[Entity {
2849            name: "desc".into(), entity_type: "t".into(), observations: vec!["o".into()],
2850        }]).unwrap();
2851
2852        let entity = kg.describe_entity("desc").unwrap();
2853        assert_eq!(entity.name, "desc");
2854    }
2855
2856    #[test]
2857    fn test_entity_type_counts() {
2858        let kg = new_kg();
2859        kg.create_entities(&[
2860            Entity { name: "a".into(), entity_type: "person".into(), observations: vec![] },
2861            Entity { name: "b".into(), entity_type: "person".into(), observations: vec![] },
2862            Entity { name: "c".into(), entity_type: "place".into(), observations: vec![] },
2863        ]).unwrap();
2864
2865        let counts = kg.entity_type_counts();
2866        let map: FxHashMap<_, _> = counts.into_iter().collect();
2867        assert_eq!(map.get("person"), Some(&2));
2868        assert_eq!(map.get("place"), Some(&1));
2869    }
2870
2871    #[test]
2872    fn test_relation_type_counts() {
2873        let kg = new_kg();
2874        kg.create_entities(&[
2875            Entity { name: "a".into(), entity_type: "n".into(), observations: vec![] },
2876            Entity { name: "b".into(), entity_type: "n".into(), observations: vec![] },
2877            Entity { name: "c".into(), entity_type: "n".into(), observations: vec![] },
2878        ]).unwrap();
2879
2880        kg.create_relations(&[
2881            Relation { from: "a".into(), to: "b".into(), relation_type: "knows".into() },
2882            Relation { from: "a".into(), to: "c".into(), relation_type: "knows".into() },
2883        ]).unwrap();
2884
2885        let counts = kg.relation_type_counts();
2886        let map: FxHashMap<_, _> = counts.into_iter().collect();
2887        assert_eq!(map.get("knows"), Some(&2));
2888    }
2889
2890    #[test]
2891    fn test_upsert_entities() {
2892        let kg = new_kg();
2893        kg.create_entities(&[Entity {
2894            name: "u".into(), entity_type: "old".into(), observations: vec!["existing".into()],
2895        }]).unwrap();
2896
2897        // Upsert with new type and additional observation.
2898        kg.upsert_entities(&[Entity {
2899            name: "u".into(), entity_type: "new".into(), observations: vec!["existing".into(), "added".into()],
2900        }]).unwrap();
2901
2902        let ent = kg.get_entity("u").unwrap().unwrap();
2903        assert_eq!(ent.entity_type, "new");
2904        assert!(ent.observations.contains(&"added".into()));
2905        assert!(ent.observations.contains(&"existing".into()));
2906    }
2907
2908    #[test]
2909    fn test_merge_entities() {
2910        let kg = new_kg();
2911        kg.create_entities(&[
2912            Entity { name: "source".into(), entity_type: "t".into(), observations: vec!["src_obs".into()] },
2913            Entity { name: "target".into(), entity_type: "t".into(), observations: vec!["tgt_obs".into()] },
2914        ]).unwrap();
2915
2916        kg.create_relations(&[Relation {
2917            from: "source".into(), to: "target".into(), relation_type: "e".into(),
2918        }]).unwrap();
2919
2920        let merged = kg.merge_entities("source", "target").unwrap();
2921        assert_eq!(merged.name, "target");
2922        assert_eq!(kg.get_entity("source").unwrap().is_none(), true);
2923    }
2924
2925    #[test]
2926    fn test_find_all_paths() {
2927        let kg = new_kg();
2928        kg.create_entities(&[
2929            Entity { name: "A".into(), entity_type: "n".into(), observations: vec![] },
2930            Entity { name: "B".into(), entity_type: "n".into(), observations: vec![] },
2931            Entity { name: "C".into(), entity_type: "n".into(), observations: vec![] },
2932        ]).unwrap();
2933
2934        kg.create_relations(&[
2935            Relation { from: "A".into(), to: "B".into(), relation_type: "e".into() },
2936            Relation { from: "B".into(), to: "C".into(), relation_type: "e".into() },
2937            Relation { from: "A".into(), to: "C".into(), relation_type: "e".into() },
2938        ]).unwrap();
2939
2940        let paths = kg.find_all_paths("A", "C", 5, 10).unwrap();
2941        assert!(paths.len() >= 2);
2942    }
2943
2944    #[test]
2945    fn test_batch_get_entities() {
2946        let kg = new_kg();
2947        kg.create_entities(&[
2948            Entity { name: "a".into(), entity_type: "t".into(), observations: vec![] },
2949            Entity { name: "b".into(), entity_type: "t".into(), observations: vec![] },
2950        ]).unwrap();
2951
2952        let results = kg.batch_get_entities(&["a".into(), "missing".into(), "b".into()]);
2953        assert_eq!(results.len(), 3);
2954        assert!(results[0].is_some());
2955        assert!(results[1].is_none());
2956        assert!(results[2].is_some());
2957    }
2958
2959    #[test]
2960    fn test_export_graph() {
2961        let kg = new_kg();
2962        kg.create_entities(&[Entity {
2963            name: "exp".into(), entity_type: "t".into(), observations: vec!["o".into()],
2964        }]).unwrap();
2965
2966        let exported = kg.export("json", i64::MAX).unwrap();
2967        assert!(exported.contains("exp"));
2968        assert!(exported.contains("o"));
2969    }
2970
2971    #[test]
2972    fn test_graph_stats() {
2973        let kg = new_kg();
2974        assert_eq!(kg.get_entity_count().unwrap(), 0);
2975        assert_eq!(kg.get_relation_count().unwrap(), 0);
2976
2977        kg.create_entities(&[Entity {
2978            name: "s".into(), entity_type: "t".into(), observations: vec![],
2979        }]).unwrap();
2980
2981        assert_eq!(kg.get_entity_count().unwrap(), 1);
2982    }
2983
2984    #[test]
2985    fn test_read_graph_filtered() {
2986        let kg = new_kg();
2987        kg.create_entities(&[
2988            Entity { name: "p1".into(), entity_type: "person".into(), observations: vec![] },
2989            Entity { name: "p2".into(), entity_type: "place".into(), observations: vec![] },
2990        ]).unwrap();
2991
2992        let out = kg.read_graph_filtered(Some("person"), 0, 10).unwrap();
2993        let v: Value = serde_json::from_str(&out).unwrap();
2994        assert_eq!(v["entities"].as_array().unwrap().len(), 1);
2995        assert_eq!(v["entities"][0]["name"], "p1");
2996    }
2997
2998    #[test]
2999    fn test_wipe() {
3000        let kg = new_kg();
3001        kg.create_entities(&[Entity {
3002            name: "w".into(), entity_type: "t".into(), observations: vec!["o".into()],
3003        }]).unwrap();
3004        assert_eq!(kg.get_entity_count().unwrap(), 1);
3005
3006        kg.wipe().unwrap();
3007        assert_eq!(kg.get_entity_count().unwrap(), 0);
3008    }
3009
3010    #[test]
3011    fn test_push_json_str() {
3012        let mut buf = String::new();
3013        push_json_str(&mut buf, "hello");
3014        assert_eq!(buf, "\"hello\"");
3015        let mut buf = String::new();
3016        push_json_str(&mut buf, "he\"llo");
3017        assert_eq!(buf, "\"he\\\"llo\"");
3018    }
3019
3020    // ── create_entities edge cases ────────────────────────────────────
3021
3022    #[test]
3023    fn test_create_entities_empty_input() {
3024        let kg = new_kg();
3025        let created = kg.create_entities(&[]).unwrap();
3026        assert!(created.is_empty());
3027    }
3028
3029    #[test]
3030    fn test_create_entities_skip_empty_name() {
3031        let kg = new_kg();
3032        let created = kg.create_entities(&[Entity {
3033            name: "".into(),
3034            entity_type: "t".into(),
3035            observations: vec![],
3036        }])
3037        .unwrap();
3038        assert!(created.is_empty());
3039        assert_eq!(kg.get_entity_count().unwrap(), 0);
3040    }
3041
3042    #[test]
3043    fn test_create_entities_duplicate_names() {
3044        let kg = new_kg();
3045        let e = Entity {
3046            name: "dup".into(),
3047            entity_type: "t".into(),
3048            observations: vec!["obs".into()],
3049        };
3050        let first = kg.create_entities(&[e.clone()]).unwrap();
3051        assert_eq!(first.len(), 1);
3052        let second = kg.create_entities(&[e.clone()]).unwrap();
3053        assert!(second.is_empty());
3054        assert_eq!(kg.get_entity_count().unwrap(), 1);
3055    }
3056
3057    #[test]
3058    fn test_create_entities_partial_duplicates() {
3059        let kg = new_kg();
3060        let created = kg.create_entities(&[
3061            Entity { name: "a".into(), entity_type: "t".into(), observations: vec![] },
3062            Entity { name: "b".into(), entity_type: "t".into(), observations: vec![] },
3063        ]).unwrap();
3064        assert_eq!(created.len(), 2);
3065
3066        let second = kg.create_entities(&[
3067            Entity { name: "b".into(), entity_type: "t".into(), observations: vec![] },
3068            Entity { name: "c".into(), entity_type: "t".into(), observations: vec![] },
3069        ]).unwrap();
3070        assert_eq!(second.len(), 1); // only c created
3071        assert_eq!(second[0].name, "c");
3072        assert_eq!(kg.get_entity_count().unwrap(), 3);
3073    }
3074
3075    #[test]
3076    fn test_create_entities_mixed_empty_and_valid() {
3077        let kg = new_kg();
3078        let created = kg.create_entities(&[
3079            Entity { name: "".into(), entity_type: "t".into(), observations: vec![] },
3080            Entity { name: "valid".into(), entity_type: "t".into(), observations: vec![] },
3081            Entity { name: "".into(), entity_type: "t".into(), observations: vec![] },
3082        ]).unwrap();
3083        assert_eq!(created.len(), 1);
3084        assert_eq!(created[0].name, "valid");
3085        assert_eq!(kg.get_entity_count().unwrap(), 1);
3086    }
3087
3088    #[test]
3089    fn test_create_entities_same_name_in_batch() {
3090        let kg = new_kg();
3091        let created = kg.create_entities(&[
3092            Entity { name: "dup_in_batch".into(), entity_type: "t".into(), observations: vec![] },
3093            Entity { name: "dup_in_batch".into(), entity_type: "t".into(), observations: vec![] },
3094        ]).unwrap();
3095        assert_eq!(created.len(), 1);
3096        assert_eq!(kg.get_entity_count().unwrap(), 1);
3097    }
3098
3099    // ── create_relations edge cases ───────────────────────────────────
3100
3101    #[test]
3102    fn test_create_relations_empty_input() {
3103        let kg = new_kg();
3104        let rels = kg.create_relations(&[]).unwrap();
3105        assert!(rels.is_empty());
3106    }
3107
3108    #[test]
3109    fn test_create_relations_nonexistent_from() {
3110        let kg = new_kg();
3111        kg.create_entities(&[Entity {
3112            name: "B".into(), entity_type: "t".into(), observations: vec![],
3113        }]).unwrap();
3114
3115        let rels = kg.create_relations(&[Relation {
3116            from: "A".into(), to: "B".into(), relation_type: "e".into(),
3117        }]).unwrap();
3118        assert!(rels.is_empty());
3119        assert_eq!(kg.get_relation_count().unwrap(), 0);
3120    }
3121
3122    #[test]
3123    fn test_create_relations_nonexistent_to() {
3124        let kg = new_kg();
3125        kg.create_entities(&[Entity {
3126            name: "A".into(), entity_type: "t".into(), observations: vec![],
3127        }]).unwrap();
3128
3129        let rels = kg.create_relations(&[Relation {
3130            from: "A".into(), to: "B".into(), relation_type: "e".into(),
3131        }]).unwrap();
3132        assert!(rels.is_empty());
3133        assert_eq!(kg.get_relation_count().unwrap(), 0);
3134    }
3135
3136    #[test]
3137    fn test_create_relations_both_nonexistent() {
3138        let kg = new_kg();
3139        let rels = kg.create_relations(&[Relation {
3140            from: "A".into(), to: "B".into(), relation_type: "e".into(),
3141        }]).unwrap();
3142        assert!(rels.is_empty());
3143    }
3144
3145    #[test]
3146    fn test_create_relations_self_loop() {
3147        let kg = new_kg();
3148        kg.create_entities(&[Entity {
3149            name: "self".into(), entity_type: "t".into(), observations: vec![],
3150        }]).unwrap();
3151
3152        let rels = kg.create_relations(&[Relation {
3153            from: "self".into(), to: "self".into(), relation_type: "loop".into(),
3154        }]).unwrap();
3155        assert_eq!(rels.len(), 1);
3156        assert_eq!(kg.get_relation_count().unwrap(), 1);
3157        assert_eq!(kg.degree("self", Direction::Outgoing).unwrap(), 1);
3158        assert_eq!(kg.degree("self", Direction::Incoming).unwrap(), 1);
3159    }
3160
3161    #[test]
3162    fn test_create_relations_duplicate() {
3163        let kg = new_kg();
3164        kg.create_entities(&[
3165            Entity { name: "A".into(), entity_type: "t".into(), observations: vec![] },
3166            Entity { name: "B".into(), entity_type: "t".into(), observations: vec![] },
3167        ]).unwrap();
3168
3169        let r = Relation {
3170            from: "A".into(), to: "B".into(), relation_type: "e".into(),
3171        };
3172        let first = kg.create_relations(&[r.clone()]).unwrap();
3173        assert_eq!(first.len(), 1);
3174
3175        let second = kg.create_relations(&[r.clone()]).unwrap();
3176        assert!(second.is_empty());
3177        assert_eq!(kg.get_relation_count().unwrap(), 1);
3178    }
3179
3180    #[test]
3181    fn test_create_relations_new_type_auto_created() {
3182        let kg = new_kg();
3183        kg.create_entities(&[
3184            Entity { name: "A".into(), entity_type: "t".into(), observations: vec![] },
3185            Entity { name: "B".into(), entity_type: "t".into(), observations: vec![] },
3186        ]).unwrap();
3187
3188        let rels = kg.create_relations(&[Relation {
3189            from: "A".into(), to: "B".into(), relation_type: "brand_new_type".into(),
3190        }]).unwrap();
3191        assert_eq!(rels.len(), 1);
3192
3193        let counts = kg.relation_type_counts();
3194        let map: FxHashMap<_, _> = counts.into_iter().collect();
3195        assert_eq!(map.get("brand_new_type"), Some(&1));
3196    }
3197
3198    #[test]
3199    fn test_create_relations_degree_updates() {
3200        let kg = new_kg();
3201        kg.create_entities(&[
3202            Entity { name: "A".into(), entity_type: "t".into(), observations: vec![] },
3203            Entity { name: "B".into(), entity_type: "t".into(), observations: vec![] },
3204            Entity { name: "C".into(), entity_type: "t".into(), observations: vec![] },
3205        ]).unwrap();
3206
3207        kg.create_relations(&[
3208            Relation { from: "A".into(), to: "B".into(), relation_type: "e".into() },
3209            Relation { from: "A".into(), to: "C".into(), relation_type: "e".into() },
3210        ]).unwrap();
3211
3212        assert_eq!(kg.degree("A", Direction::Outgoing).unwrap(), 2);
3213        assert_eq!(kg.degree("A", Direction::Incoming).unwrap(), 0);
3214        assert_eq!(kg.degree("B", Direction::Incoming).unwrap(), 1);
3215        assert_eq!(kg.degree("C", Direction::Incoming).unwrap(), 1);
3216        assert_eq!(kg.degree("A", Direction::Both).unwrap(), 2);
3217    }
3218
3219    #[test]
3220    fn test_create_relations_delete_and_recreate() {
3221        let kg = new_kg();
3222        kg.create_entities(&[
3223            Entity { name: "A".into(), entity_type: "t".into(), observations: vec![] },
3224            Entity { name: "B".into(), entity_type: "t".into(), observations: vec![] },
3225        ]).unwrap();
3226
3227        let r = Relation {
3228            from: "A".into(), to: "B".into(), relation_type: "e".into(),
3229        };
3230        kg.create_relations(&[r.clone()]).unwrap();
3231        assert_eq!(kg.get_relation_count().unwrap(), 1);
3232
3233        kg.delete_relations(&[r.clone()]).unwrap();
3234        assert_eq!(kg.get_relation_count().unwrap(), 0);
3235
3236        // Recreate after delete
3237        let re = kg.create_relations(&[r.clone()]).unwrap();
3238        assert_eq!(re.len(), 1);
3239        assert_eq!(kg.get_relation_count().unwrap(), 1);
3240    }
3241
3242    // ── Integration edge cases ────────────────────────────────────────
3243
3244    #[test]
3245    fn test_create_entities_then_relations_then_delete_entity_with_relations() {
3246        let kg = new_kg();
3247        kg.create_entities(&[
3248            Entity { name: "A".into(), entity_type: "t".into(), observations: vec![] },
3249            Entity { name: "B".into(), entity_type: "t".into(), observations: vec![] },
3250        ]).unwrap();
3251        kg.create_relations(&[
3252            Relation { from: "A".into(), to: "B".into(), relation_type: "e".into() },
3253        ]).unwrap();
3254
3255        assert_eq!(kg.get_relation_count().unwrap(), 1);
3256
3257        // Deleting entity A should also delete the relation
3258        kg.delete_entities(&["A".into()]).unwrap();
3259        assert_eq!(kg.get_entity("A").unwrap().is_none(), true);
3260        assert_eq!(kg.get_relation_count().unwrap(), 0);
3261    }
3262
3263    #[test]
3264    fn test_graph_stats_after_entity_with_observations() {
3265        let kg = new_kg();
3266        kg.create_entities(&[Entity {
3267            name: "stat".into(), entity_type: "t".into(),
3268            observations: vec!["o1".into(), "o2".into(), "o3".into()],
3269        }]).unwrap();
3270
3271        let ecount = kg.get_entity_count().unwrap();
3272        // graph_stat for observations is tracked but there's no public getter for it
3273        assert_eq!(ecount, 1);
3274
3275        // delete reverts stats
3276        kg.delete_entities(&["stat".into()]).unwrap();
3277        assert_eq!(kg.get_entity_count().unwrap(), 0);
3278    }
3279
3280    // ── Helpers for the fix-specific suites ────────────────────────────────
3281
3282    fn new_kg_with_pool(read_pool_size: usize) -> TestKg {
3283        use std::sync::atomic::AtomicU64;
3284        static COUNTER: AtomicU64 = AtomicU64::new(1_000_000);
3285        let n = COUNTER.fetch_add(1, Ordering::SeqCst);
3286        let path = std::env::temp_dir().join(format!("kg_pool_{}_{}.db", std::process::id(), n));
3287        cleanup_db(&path);
3288        let kg = GraphHandle::new(
3289            &path,
3290            Durability::Async,
3291            SqliteTuning::default(),
3292            NonZeroUsize::new(10_000).unwrap(),
3293            read_pool_size,
3294        )
3295        .expect("create KG");
3296        TestKg(kg, path)
3297    }
3298
3299    fn seed_line(kg: &GraphHandle, n: usize) {
3300        let entities: Vec<Entity> = (0..n)
3301            .map(|i| Entity {
3302                name: format!("n{i}"),
3303                entity_type: "node".into(),
3304                observations: vec![format!("obs of n{i}")],
3305            })
3306            .collect();
3307        kg.create_entities(&entities).unwrap();
3308        let rels: Vec<Relation> = (0..n.saturating_sub(1))
3309            .map(|i| Relation {
3310                from: format!("n{i}"),
3311                to: format!("n{}", i + 1),
3312                relation_type: "edge".into(),
3313            })
3314            .collect();
3315        if !rels.is_empty() {
3316            kg.create_relations(&rels).unwrap();
3317        }
3318    }
3319
3320    fn count_relations(graph_json: &str) -> usize {
3321        let v: Value = serde_json::from_str(graph_json).unwrap();
3322        v["relations"].as_array().unwrap().len()
3323    }
3324
3325    fn count_entities(graph_json: &str) -> usize {
3326        let v: Value = serde_json::from_str(graph_json).unwrap();
3327        v["entities"].as_array().unwrap().len()
3328    }
3329
3330    // ── Fix #1: reader pool / concurrency ──────────────────────────────────
3331
3332    #[test]
3333    fn test_pool_size_one_still_works() {
3334        let kg = new_kg_with_pool(1);
3335        seed_line(&kg, 5);
3336        assert_eq!(kg.get_entity_count().unwrap(), 5);
3337        assert!(kg.get_entity("n2").unwrap().is_some());
3338        let g = kg.read_graph_filtered(None, 0, usize::MAX).unwrap();
3339        assert_eq!(count_entities(&g), 5);
3340    }
3341
3342    #[test]
3343    fn test_reads_see_committed_writes() {
3344        // A read on a pool connection must observe a just-committed write made on
3345        // the writer connection (WAL visibility).
3346        let kg = new_kg_with_pool(4);
3347        kg.create_entities(&[Entity {
3348            name: "fresh".into(),
3349            entity_type: "t".into(),
3350            observations: vec!["v".into()],
3351        }])
3352        .unwrap();
3353        // get_entity goes through the reader pool.
3354        let got = kg.get_entity("fresh").unwrap().unwrap();
3355        assert_eq!(got.observations, vec!["v"]);
3356    }
3357
3358    #[test]
3359    fn test_concurrent_readers_consistent() {
3360        // Many readers hammering the pool while the writer mutates must never
3361        // panic, deadlock, or observe a torn graph. The final counts must match.
3362        let kg = new_kg_with_pool(4);
3363        seed_line(&kg, 50);
3364
3365        std::thread::scope(|s| {
3366            // 8 reader threads.
3367            for _ in 0..8 {
3368                s.spawn(|| {
3369                    for _ in 0..200 {
3370                        let _ = kg.get_entity("n10");
3371                        let _ = kg.search_nodes_filtered("obs", None, 0, 10);
3372                        let _ = kg.read_graph_filtered(None, 0, 100);
3373                        let _ = kg.get_entity_count();
3374                        let _ = kg.neighbors("n10", Direction::Both, None, 2);
3375                    }
3376                });
3377            }
3378            // 1 writer thread adding more entities concurrently.
3379            s.spawn(|| {
3380                for i in 100..160 {
3381                    kg.create_entities(&[Entity {
3382                        name: format!("w{i}"),
3383                        entity_type: "node".into(),
3384                        observations: vec![format!("w obs {i}")],
3385                    }])
3386                    .unwrap();
3387                }
3388            });
3389        });
3390
3391        // 50 seeded + 60 written.
3392        assert_eq!(kg.get_entity_count().unwrap(), 110);
3393        assert!(kg.get_entity("w159").unwrap().is_some());
3394    }
3395
3396    #[test]
3397    fn test_reader_pool_rejects_writes_internally() {
3398        // Sanity: query_only readers cannot mutate. We can't call a write through
3399        // the pool directly, but we can confirm a read method that *would* have
3400        // inserted (search_relations resolving a missing type) does not create a
3401        // phantom type — see the dedicated test below — and that reads under a
3402        // size-1 pool serialize correctly without deadlock.
3403        let kg = new_kg_with_pool(1);
3404        seed_line(&kg, 3);
3405        std::thread::scope(|s| {
3406            for _ in 0..4 {
3407                s.spawn(|| {
3408                    for _ in 0..100 {
3409                        let _ = kg.read_graph_filtered(None, 0, 10);
3410                    }
3411                });
3412            }
3413        });
3414        assert_eq!(kg.get_entity_count().unwrap(), 3);
3415    }
3416
3417    // ── Fix #6: read_graph relation scoping + export bound ─────────────────
3418
3419    #[test]
3420    fn test_read_graph_relations_scoped_to_page() {
3421        let kg = new_kg_with_pool(2);
3422        // n0 -> n1 -> n2 -> n3 (4 entities, 3 edges).
3423        seed_line(&kg, 4);
3424
3425        // Full page: all 3 edges present.
3426        let full = kg.read_graph_filtered(None, 0, usize::MAX).unwrap();
3427        assert_eq!(count_entities(&full), 4);
3428        assert_eq!(count_relations(&full), 3);
3429
3430        // Page of only the first entity (n0): its only edge n0->n1 has an
3431        // endpoint (n1) outside the page, so no relations are returned.
3432        let page1 = kg.read_graph_filtered(None, 0, 1).unwrap();
3433        assert_eq!(count_entities(&page1), 1);
3434        assert_eq!(count_relations(&page1), 0);
3435
3436        // Page of first two entities (n0, n1): edge n0->n1 fully inside, n1->n2
3437        // straddles the boundary and is excluded.
3438        let page2 = kg.read_graph_filtered(None, 0, 2).unwrap();
3439        assert_eq!(count_entities(&page2), 2);
3440        assert_eq!(count_relations(&page2), 1);
3441    }
3442
3443    #[test]
3444    fn test_read_graph_pagination_offset() {
3445        let kg = new_kg_with_pool(2);
3446        seed_line(&kg, 5);
3447        let g = kg.read_graph_filtered(None, 2, 2).unwrap();
3448        assert_eq!(count_entities(&g), 2);
3449        // Entities are ordered by id; offset 2 skips n0, n1.
3450        assert!(!g.contains("\"n0\""));
3451        assert!(!g.contains("\"n1\""));
3452        assert!(g.contains("\"n2\""));
3453        assert!(g.contains("\"n3\""));
3454    }
3455
3456    #[test]
3457    fn test_read_graph_empty() {
3458        let kg = new_kg_with_pool(2);
3459        let g = kg.read_graph_filtered(None, 0, usize::MAX).unwrap();
3460        assert_eq!(g, r#"{"entities":[],"relations":[]}"#);
3461    }
3462
3463    #[test]
3464    fn test_read_graph_filtered_by_type() {
3465        let kg = new_kg_with_pool(2);
3466        kg.create_entities(&[
3467            Entity { name: "p1".into(), entity_type: "person".into(), observations: vec![] },
3468            Entity { name: "q1".into(), entity_type: "place".into(), observations: vec![] },
3469            Entity { name: "p2".into(), entity_type: "person".into(), observations: vec![] },
3470        ])
3471        .unwrap();
3472        let g = kg.read_graph_filtered(Some("person"), 0, usize::MAX).unwrap();
3473        assert_eq!(count_entities(&g), 2);
3474        assert!(g.contains("\"p1\""));
3475        assert!(g.contains("\"p2\""));
3476        assert!(!g.contains("\"q1\""));
3477    }
3478
3479    #[test]
3480    fn test_export_respects_max_rows() {
3481        let kg = new_kg_with_pool(2);
3482        seed_line(&kg, 5);
3483
3484        // Unbounded export returns everything.
3485        let full = kg.export("json", i64::MAX).unwrap();
3486        assert_eq!(count_entities(&full), 5);
3487        assert_eq!(count_relations(&full), 4);
3488
3489        // Capped export truncates both arrays to the cap.
3490        let capped = kg.export("json", 2).unwrap();
3491        assert_eq!(count_entities(&capped), 2);
3492        assert_eq!(count_relations(&capped), 2);
3493    }
3494
3495    #[test]
3496    fn test_export_negative_max_rows_is_unbounded() {
3497        let kg = new_kg_with_pool(2);
3498        seed_line(&kg, 3);
3499        // SQLite treats a negative LIMIT as "no limit".
3500        let out = kg.export("json", -1).unwrap();
3501        assert_eq!(count_entities(&out), 3);
3502    }
3503
3504    // ── Fix #8: writes remain correct without the per-write PRAGMA optimize ─
3505
3506    #[test]
3507    fn test_many_small_write_batches_stay_consistent() {
3508        let kg = new_kg_with_pool(2);
3509        for i in 0..100 {
3510            kg.create_entities(&[Entity {
3511                name: format!("e{i}"),
3512                entity_type: "t".into(),
3513                observations: vec![format!("o{i}")],
3514            }])
3515            .unwrap();
3516        }
3517        assert_eq!(kg.get_entity_count().unwrap(), 100);
3518        // Search must still find a needle inserted across many tiny batches,
3519        // proving FTS stayed consistent without per-write optimization.
3520        let hits = kg.search_nodes_filtered("e57", None, 0, 10);
3521        assert!(hits.iter().any(|e| e.name == "e57"));
3522    }
3523
3524    // ── Fix #9: wipe fully resets the FTS indexes ──────────────────────────
3525
3526    #[test]
3527    fn test_wipe_clears_name_and_obs_fts() {
3528        let kg = new_kg_with_pool(2);
3529        kg.create_entities(&[Entity {
3530            name: "Einstein".into(),
3531            entity_type: "scientist".into(),
3532            observations: vec!["physics".into()],
3533        }])
3534        .unwrap();
3535
3536        // Both FTS indexes resolve before the wipe.
3537        assert_eq!(kg.search_nodes_filtered("Einstein", None, 0, 10).len(), 1);
3538        assert_eq!(kg.search_nodes_filtered("physics", None, 0, 10).len(), 1);
3539
3540        kg.wipe().unwrap();
3541
3542        // After wipe both indexes must be empty (a bare DELETE on an
3543        // external-content FTS5 table would have left stale rowids behind).
3544        assert_eq!(kg.get_entity_count().unwrap(), 0);
3545        assert!(kg.search_nodes_filtered("Einstein", None, 0, 10).is_empty());
3546        assert!(kg.search_nodes_filtered("physics", None, 0, 10).is_empty());
3547    }
3548
3549    #[test]
3550    fn test_wipe_then_recreate_search_works() {
3551        // Recreating the same names after a wipe must produce a clean, searchable
3552        // index — not a corrupted one or duplicate FTS rows.
3553        let kg = new_kg_with_pool(2);
3554        kg.create_entities(&[Entity {
3555            name: "Einstein".into(),
3556            entity_type: "scientist".into(),
3557            observations: vec!["physics".into()],
3558        }])
3559        .unwrap();
3560        kg.wipe().unwrap();
3561
3562        kg.create_entities(&[Entity {
3563            name: "Einstein".into(),
3564            entity_type: "scientist".into(),
3565            observations: vec!["physics".into(), "relativity".into()],
3566        }])
3567        .unwrap();
3568
3569        let by_name = kg.search_nodes_filtered("Einstein", None, 0, 10);
3570        assert_eq!(by_name.len(), 1, "exactly one Einstein after recreate");
3571        let by_obs = kg.search_nodes_filtered("relativity", None, 0, 10);
3572        assert_eq!(by_obs.len(), 1);
3573        assert_eq!(kg.get_entity_count().unwrap(), 1);
3574    }
3575
3576    // ── Read-only type/entity resolution (introduced by the reader pool) ───
3577
3578    #[test]
3579    fn test_search_relations_missing_type_returns_empty() {
3580        let kg = new_kg_with_pool(2);
3581        seed_line(&kg, 3); // edges of type "edge"
3582        // A filter for a relation type that does not exist must return nothing,
3583        // not every relation — and must not create a phantom type row.
3584        let r = kg.search_relations(None, None, Some("does_not_exist"), None);
3585        assert!(r.is_empty());
3586        // The phantom type must not have been inserted by the read.
3587        let types = kg.relation_type_counts();
3588        assert!(types.iter().all(|(t, _)| t != "does_not_exist"));
3589    }
3590
3591    #[test]
3592    fn test_search_relations_missing_from_returns_empty() {
3593        let kg = new_kg_with_pool(2);
3594        seed_line(&kg, 3);
3595        let r = kg.search_relations(Some("ghost"), None, None, None);
3596        assert!(r.is_empty(), "missing 'from' must not match every relation");
3597    }
3598
3599    #[test]
3600    fn test_search_relations_existing_filters_still_work() {
3601        let kg = new_kg_with_pool(2);
3602        seed_line(&kg, 3);
3603        let r = kg.search_relations(Some("n0"), None, Some("edge"), None);
3604        assert_eq!(r.len(), 1);
3605        assert_eq!(r[0].from, "n0");
3606        assert_eq!(r[0].to, "n1");
3607    }
3608
3609    #[test]
3610    fn test_neighbors_missing_type_returns_only_start() {
3611        let kg = new_kg_with_pool(2);
3612        seed_line(&kg, 3);
3613        let json = kg
3614            .neighbors("n0", Direction::Both, Some("nonexistent"), 2)
3615            .unwrap();
3616        // No edge matches the bogus type, so only the start node comes back.
3617        assert_eq!(count_entities(&json), 1);
3618        assert_eq!(count_relations(&json), 0);
3619    }
3620
3621    #[test]
3622    fn test_neighbors_existing_type_filters() {
3623        let kg = new_kg_with_pool(2);
3624        kg.create_entities(&[
3625            Entity { name: "a".into(), entity_type: "n".into(), observations: vec![] },
3626            Entity { name: "b".into(), entity_type: "n".into(), observations: vec![] },
3627            Entity { name: "c".into(), entity_type: "n".into(), observations: vec![] },
3628        ])
3629        .unwrap();
3630        kg.create_relations(&[
3631            Relation { from: "a".into(), to: "b".into(), relation_type: "knows".into() },
3632            Relation { from: "a".into(), to: "c".into(), relation_type: "likes".into() },
3633        ])
3634        .unwrap();
3635        let json = kg.neighbors("a", Direction::Outgoing, Some("knows"), 1).unwrap();
3636        assert!(json.contains("\"b\""));
3637        assert!(!json.contains("\"c\""));
3638        assert_eq!(count_relations(&json), 1);
3639    }
3640
3641    #[test]
3642    fn test_sqlite_tuning_applied_to_fresh_db() {
3643        use std::sync::atomic::AtomicU64;
3644        static COUNTER: AtomicU64 = AtomicU64::new(2_000_000);
3645        let n = COUNTER.fetch_add(1, Ordering::SeqCst);
3646        let path = std::env::temp_dir().join(format!("kg_tuning_{}_{}.db", std::process::id(), n));
3647        cleanup_db(&path);
3648
3649        let tuning = SqliteTuning {
3650            page_size: 8192,
3651            ..SqliteTuning::default()
3652        };
3653        let kg = TestKg(
3654            GraphHandle::new(&path, Durability::Async, tuning, NonZeroUsize::new(64).unwrap(), 2)
3655                .expect("create KG"),
3656            path.clone(),
3657        );
3658        kg.create_entities(&[Entity {
3659            name: "a".into(),
3660            entity_type: "n".into(),
3661            observations: vec!["o".into()],
3662        }])
3663        .unwrap();
3664
3665        // page_size (fresh-DB only) and auto_vacuum=INCREMENTAL must have taken
3666        // effect, and journal_mode must be WAL.
3667        let probe = Connection::open(&path).unwrap();
3668        let page_size: i64 = probe.query_row("PRAGMA page_size", [], |r| r.get(0)).unwrap();
3669        assert_eq!(page_size, 8192);
3670        let auto_vacuum: i64 = probe.query_row("PRAGMA auto_vacuum", [], |r| r.get(0)).unwrap();
3671        assert_eq!(auto_vacuum, 2, "expected INCREMENTAL auto_vacuum");
3672        let journal: String = probe.query_row("PRAGMA journal_mode", [], |r| r.get(0)).unwrap();
3673        assert_eq!(journal.to_lowercase(), "wal");
3674    }
3675
3676    #[test]
3677    fn test_checkpoint_passive_is_noop_safe() {
3678        let kg = new_kg();
3679        // On an empty / freshly-written DB a passive checkpoint must succeed.
3680        kg.checkpoint_passive().unwrap();
3681        kg.create_entities(&[Entity {
3682            name: "a".into(),
3683            entity_type: "n".into(),
3684            observations: vec!["o".into()],
3685        }])
3686        .unwrap();
3687        // And after a write, repeatedly, without error or deadlock.
3688        kg.checkpoint_passive().unwrap();
3689        kg.checkpoint_passive().unwrap();
3690        // Data is still readable afterwards.
3691        assert!(kg.get_entity("a").unwrap().is_some());
3692    }
3693}