Skip to main content

mcp_memory/
kg.rs

1use rustc_hash::FxHashMap;
2use std::collections::{HashSet, VecDeque};
3use std::num::NonZeroUsize;
4use std::path::Path;
5use std::sync::atomic::{AtomicI64, AtomicUsize, Ordering};
6use std::time::{Duration, SystemTime, UNIX_EPOCH};
7
8use lru::LruCache;
9use parking_lot::{Mutex, MutexGuard};
10use rusqlite::{params, types::ToSql, Connection, OpenFlags};
11
12use crate::config::{Durability, SqliteTuning};
13use crate::errors::{MCSError, Result};
14use crate::types::{Entity, Relation};
15
16fn sqlite_err(e: rusqlite::Error) -> MCSError {
17    MCSError::IoError(std::io::Error::other(e))
18}
19
20const fn is_not_found(e: &rusqlite::Error) -> bool {
21    matches!(e, rusqlite::Error::QueryReturnedNoRows)
22}
23
24#[inline(always)]
25fn now_us() -> i64 {
26    SystemTime::now()
27        .duration_since(UNIX_EPOCH)
28        .unwrap_or_default()
29        .as_micros() as i64
30}
31
32#[inline(always)]
33pub(crate) fn name_hash(name: &str) -> i64 {
34    let mut h: u64 = 0xcbf29ce484222325;
35    for b in name.bytes() {
36        h ^= u64::from(b);
37        h = h.wrapping_mul(0x100000001b3);
38    }
39    h as i64
40}
41
42fn load_observations(conn: &Connection, entity_id: i64) -> Result<Vec<String>> {
43    let mut stmt = conn
44        .prepare_cached("SELECT body FROM observation WHERE entity_id = ?1 ORDER BY idx")
45        .map_err(sqlite_err)?;
46    let rows = stmt
47        .query_map(params![entity_id], |row| row.get::<_, String>(0))
48        .map_err(sqlite_err)?
49        .filter_map(|r| r.ok())
50        .collect::<Vec<_>>();
51    Ok(rows)
52}
53
54fn load_observations_opt(conn: &Connection, entity_id: i64) -> Vec<String> {
55    load_observations(conn, entity_id).unwrap_or_default()
56}
57
58fn entity_name_lookup(conn: &Connection, name: &str) -> Result<Option<i64>> {
59    let h = name_hash(name);
60    let mut stmt = conn
61        .prepare_cached(
62            "SELECT id FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
63        )
64        .map_err(sqlite_err)?;
65    match stmt.query_row(params![h, name], |row| row.get::<_, i64>(0)) {
66        Ok(id) => Ok(Some(id)),
67        Err(e) if is_not_found(&e) => Ok(None),
68        Err(e) => Err(sqlite_err(e)),
69    }
70}
71
72fn get_type_id(conn: &Connection, type_name: &str, kind: i64) -> Result<i64> {
73    let mut sel = conn
74        .prepare_cached("SELECT id FROM type_dict WHERE kind = ?1 AND name = ?2")
75        .map_err(sqlite_err)?;
76    if let Ok(id) = sel.query_row(params![kind, type_name], |row| row.get::<_, i64>(0)) {
77        return Ok(id);
78    }
79    conn.execute(
80        "INSERT INTO type_dict (kind, name, count) VALUES (?1, ?2, 0)",
81        params![kind, type_name],
82    )
83    .map_err(sqlite_err)?;
84    Ok(conn.last_insert_rowid())
85}
86
87/// Read-only type lookup. Unlike [`get_type_id`] this never inserts, so it is
88/// safe to call on a `query_only` reader connection. Returns `None` when the
89/// type does not exist.
90fn lookup_type_id(conn: &Connection, type_name: &str, kind: i64) -> Option<i64> {
91    conn.prepare_cached("SELECT id FROM type_dict WHERE kind = ?1 AND name = ?2")
92        .ok()?
93        .query_row(params![kind, type_name], |row| row.get::<_, i64>(0))
94        .ok()
95}
96
97fn inc_type_count(conn: &Connection, type_id: i64, delta: i64) -> Result<()> {
98    conn.execute(
99        "UPDATE type_dict SET count = count + ?1 WHERE id = ?2",
100        params![delta, type_id],
101    )
102    .map_err(sqlite_err)?;
103    Ok(())
104}
105
106fn inc_graph_stat(conn: &Connection, key: &str, delta: i64) -> Result<()> {
107    conn.execute(
108        "UPDATE graph_stat SET value = value + ?1 WHERE key = ?2",
109        params![delta, key],
110    )
111    .map_err(sqlite_err)?;
112    Ok(())
113}
114
115fn read_graph_stat(conn: &Connection, key: &str) -> Result<i64> {
116    conn.query_row(
117        "SELECT value FROM graph_stat WHERE key = ?1",
118        params![key],
119        |row| row.get(0),
120    )
121    .map_err(sqlite_err)
122}
123
124fn name_of_type(conn: &Connection, type_id: i64) -> Result<String> {
125    conn.query_row(
126        "SELECT name FROM type_dict WHERE id = ?1",
127        params![type_id],
128        |row| row.get(0),
129    )
130    .map_err(sqlite_err)
131}
132
133fn select_all_types(conn: &Connection, kind: i64) -> Result<Vec<(String, usize)>> {
134    let mut stmt = conn
135        .prepare_cached(
136            "SELECT name, count FROM type_dict WHERE kind = ?1 AND count > 0 ORDER BY count DESC",
137        )
138        .map_err(sqlite_err)?;
139    let rows = stmt
140        .query_map(params![kind], |row| {
141            Ok((
142                row.get::<_, String>(0)?,
143                row.get::<_, i64>(1)? as usize,
144            ))
145        })
146        .map_err(sqlite_err)?
147        .filter_map(|r| r.ok())
148        .collect();
149    Ok(rows)
150}
151
152fn entity_by_id(conn: &Connection, id: i64) -> Result<Entity> {
153    let mut stmt = conn
154        .prepare_cached(
155            "SELECT e.name, t.name,
156               COALESCE((SELECT json_group_array(o.body ORDER BY o.idx) FROM observation o WHERE o.entity_id = e.id), '[]')
157             FROM entity e JOIN type_dict t ON t.id = e.type_id WHERE e.id = ?1 AND e.flags = 0",
158        )
159        .map_err(sqlite_err)?;
160    let (name, etype, obs_json): (String, String, String) = stmt
161        .query_row(params![id], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)))
162        .map_err(sqlite_err)?;
163    let observations: Vec<String> = serde_json::from_str(&obs_json).unwrap_or_default();
164    Ok(Entity {
165        name,
166        entity_type: etype,
167        observations,
168    })
169}
170
171/// Direction of relation traversal.
172#[derive(Debug, Clone, Copy, PartialEq, Eq)]
173pub enum Direction {
174    Outgoing,
175    Incoming,
176    Both,
177}
178
179impl Direction {
180    pub fn parse(s: Option<&str>) -> Self {
181        match s {
182            Some("OUTGOING") => Direction::Outgoing,
183            Some("INCOMING") => Direction::Incoming,
184            _ => Direction::Both,
185        }
186    }
187}
188
189/// Escape a string for embedding in JSON, writing directly into the given buffer.
190/// Avoids allocating a temporary `serde_json::Value` for the JSON-RPC wrapper.
191pub fn push_json_str(buf: &mut String, raw: &str) {
192    buf.push('"');
193    let mut start = 0;
194    let bytes = raw.as_bytes();
195    for (i, &b) in bytes.iter().enumerate() {
196        let esc: u8 = match b {
197            b'"' => b'"',
198            b'\\' => b'\\',
199            b'\n' => b'n',
200            b'\r' => b'r',
201            b'\t' => b't',
202            0x08 => b'b',
203            0x0C => b'f',
204            0x00..=0x07 | 0x0B | 0x0E..=0x1F => continue, // escaped below
205            _ => continue,
206        };
207        buf.push_str(&raw[start..i]);
208        buf.push('\\');
209        buf.push(esc as char);
210        start = i + 1;
211    }
212    // Control chars 0x00-0x1F not handled above: escape as \u00XX
213    for (i, &b) in bytes.iter().enumerate().skip(start) {
214        if b <= 0x07 || b == 0x0B || (0x0E..=0x1F).contains(&b) {
215            buf.push_str(&raw[start..i]);
216            write_escape_unicode(buf, b);
217            start = i + 1;
218        }
219    }
220    buf.push_str(&raw[start..]);
221    buf.push('"');
222}
223
224#[inline(never)]
225fn write_escape_unicode(buf: &mut String, b: u8) {
226    use std::fmt::Write;
227    write!(buf, "\\u{:04x}", b).unwrap();
228}
229
230// ── MetaCache ────────────────────────────────────────────────────────────
231
232#[derive(Copy, Clone)]
233struct EntityMeta {
234    id: i64,
235    type_id: i64,
236    obs_count: i64,
237    out_deg: i64,
238    in_deg: i64,
239}
240
241// ── Transaction guard (RAII rollback on error) ─────────────────────────
242
243struct TxGuard<'a> {
244    conn: &'a Connection,
245    done: bool,
246}
247
248impl<'a> TxGuard<'a> {
249    fn begin(conn: &'a Connection) -> Result<Self> {
250        // BEGIN IMMEDIATE acquires the WAL write lock up front rather than
251        // lazily on the first write. This makes the busy-timeout apply to lock
252        // acquisition deterministically and avoids `SQLITE_BUSY_SNAPSHOT`
253        // surprises when readers are concurrently active.
254        conn.execute_batch("BEGIN IMMEDIATE").map_err(sqlite_err)?;
255        Ok(Self { conn, done: false })
256    }
257
258    fn commit(mut self) -> Result<()> {
259        self.done = true;
260        self.conn.execute_batch("COMMIT").map_err(sqlite_err)
261    }
262}
263
264impl Drop for TxGuard<'_> {
265    fn drop(&mut self) {
266        if !self.done {
267            let _ = self.conn.execute_batch("ROLLBACK");
268        }
269    }
270}
271
272// ── Reader pool ───────────────────────────────────────────────────────────
273
274/// A small fixed pool of `query_only` SQLite connections used for read
275/// operations. WAL mode permits any number of concurrent readers alongside the
276/// single writer, so spreading reads across several connections lets them run
277/// in parallel instead of serializing on the writer's mutex.
278struct ReaderPool {
279    conns: Vec<Mutex<Connection>>,
280    next: AtomicUsize,
281}
282
283impl ReaderPool {
284    /// Acquire a reader connection. Fast path: grab the first idle one. If every
285    /// connection is busy, block on a round-robin pick so callers still make
286    /// progress (and never spin).
287    fn get(&self) -> MutexGuard<'_, Connection> {
288        for c in &self.conns {
289            if let Some(g) = c.try_lock() {
290                return g;
291            }
292        }
293        let i = self.next.fetch_add(1, Ordering::Relaxed) % self.conns.len();
294        self.conns[i].lock()
295    }
296}
297
298// ── GraphHandle ──────────────────────────────────────────────────────────
299
300pub struct GraphHandle {
301    /// The single read-write connection. SQLite allows only one writer, so all
302    /// mutations serialize here.
303    writer: Mutex<Connection>,
304    /// Pool of `query_only` connections for concurrent reads (WAL).
305    readers: ReaderPool,
306    seq_entity: AtomicI64,
307    seq_obs: AtomicI64,
308    cache: Mutex<LruCache<String, EntityMeta>>,
309}
310
311/// Open one `query_only` reader connection against an existing WAL database.
312///
313/// The connection is opened read-write at the OS level (so it can attach to the
314/// `-shm` wal-index — SQLite cannot read a WAL database through a pure
315/// `SQLITE_OPEN_READ_ONLY` handle) and then locked to reads with
316/// `PRAGMA query_only = ON`, which makes any accidental write error out.
317fn open_reader(path: &Path, tuning: &SqliteTuning) -> Result<Connection> {
318    let conn = Connection::open_with_flags(
319        path,
320        OpenFlags::SQLITE_OPEN_READ_WRITE
321            | OpenFlags::SQLITE_OPEN_NO_MUTEX
322            | OpenFlags::SQLITE_OPEN_URI,
323    )
324    .map_err(sqlite_err)?;
325    conn.busy_timeout(Duration::from_millis(tuning.busy_timeout_ms))
326        .map_err(sqlite_err)?;
327    conn.execute_batch(&format!(
328        "PRAGMA query_only   = ON;
329         PRAGMA cache_size   = -{};
330         PRAGMA temp_store   = MEMORY;
331         PRAGMA mmap_size    = {};",
332        tuning.cache_size_kb, tuning.mmap_size
333    ))
334    .map_err(sqlite_err)?;
335    Ok(conn)
336}
337
338impl GraphHandle {
339    pub fn new(
340        path: &Path,
341        durability: Durability,
342        tuning: SqliteTuning,
343        lru_cache_size: NonZeroUsize,
344        read_pool_size: usize,
345    ) -> Result<Self> {
346        let conn = Connection::open(path).map_err(sqlite_err)?;
347        // Apply the busy handler through the API so it is in force for every
348        // subsequent statement (including schema creation and BEGIN IMMEDIATE).
349        conn.busy_timeout(Duration::from_millis(tuning.busy_timeout_ms))
350            .map_err(sqlite_err)?;
351
352        // `page_size` and `auto_vacuum` are fixed when the database first gets
353        // content, and `page_size` additionally must precede `journal_mode=WAL`.
354        // Set both up front on this connection, before any table is created, so
355        // they take effect on a fresh database. On an existing database they are
356        // silently ignored (would require VACUUM to change).
357        conn.execute_batch(&format!(
358            "PRAGMA page_size    = {};
359             PRAGMA auto_vacuum  = INCREMENTAL;",
360            tuning.page_size
361        ))
362        .map_err(sqlite_err)?;
363
364        conn.execute_batch(&format!(
365             "PRAGMA journal_mode = WAL;
366             PRAGMA foreign_keys = OFF;
367             PRAGMA cache_size    = -{};
368             PRAGMA temp_store    = MEMORY;
369             PRAGMA busy_timeout  = {};
370             PRAGMA synchronous   = NORMAL;
371             PRAGMA journal_size_limit = {};",
372            tuning.cache_size_kb, tuning.busy_timeout_ms, tuning.journal_size_limit
373        ))
374        .map_err(sqlite_err)?;
375
376        conn.execute_batch(
377             "CREATE TABLE IF NOT EXISTS entity (
378                 id          INTEGER PRIMARY KEY,
379                 name_hash   INTEGER NOT NULL,
380                 name        TEXT    NOT NULL,
381                 type_id     INTEGER NOT NULL,
382                 obs_count   INTEGER NOT NULL DEFAULT 0,
383                 out_deg     INTEGER NOT NULL DEFAULT 0,
384                 in_deg      INTEGER NOT NULL DEFAULT 0,
385                 created_us  INTEGER NOT NULL,
386                 updated_us  INTEGER NOT NULL,
387                 flags       INTEGER NOT NULL DEFAULT 0
388             ) STRICT;
389
390             CREATE INDEX IF NOT EXISTS entity_by_hash
391                 ON entity(name_hash, type_id, obs_count, out_deg, in_deg)
392                 WHERE flags = 0;
393
394             CREATE INDEX IF NOT EXISTS entity_name_ci
395                 ON entity(lower(name))
396                 WHERE flags = 0;
397
398             CREATE TABLE IF NOT EXISTS observation (
399                 id          INTEGER PRIMARY KEY,
400                 entity_id   INTEGER NOT NULL,
401                 idx         INTEGER NOT NULL,
402                 body        TEXT    NOT NULL,
403                 created_us  INTEGER NOT NULL
404             ) STRICT;
405
406             CREATE INDEX IF NOT EXISTS obs_by_entity
407                 ON observation(entity_id, idx);
408
409             CREATE TABLE IF NOT EXISTS relation (
410                 from_id     INTEGER NOT NULL,
411                 to_id       INTEGER NOT NULL,
412                 type_id     INTEGER NOT NULL,
413                 created_us  INTEGER NOT NULL
414             ) STRICT;
415
416             CREATE INDEX IF NOT EXISTS rel_out
417                 ON relation(from_id, type_id, to_id);
418
419             CREATE INDEX IF NOT EXISTS rel_in
420                 ON relation(to_id, type_id, from_id);
421
422             CREATE VIRTUAL TABLE IF NOT EXISTS name_fts
423                 USING fts5(name, content='entity', content_rowid='id',
424                            tokenize='unicode61 remove_diacritics 2');
425
426             CREATE VIRTUAL TABLE IF NOT EXISTS obs_fts
427                 USING fts5(body, content='observation', content_rowid='id',
428                            tokenize='unicode61 remove_diacritics 2');
429
430             CREATE TRIGGER IF NOT EXISTS obs_fts_ai AFTER INSERT ON observation BEGIN
431               INSERT INTO obs_fts(rowid, body) VALUES (new.id, new.body);
432             END;
433
434             CREATE TRIGGER IF NOT EXISTS obs_fts_bd BEFORE DELETE ON observation BEGIN
435               INSERT INTO obs_fts(obs_fts, rowid, body) VALUES ('delete', old.id, '');
436             END;
437
438             CREATE TABLE IF NOT EXISTS type_dict (
439                 id     INTEGER PRIMARY KEY,
440                 kind   INTEGER NOT NULL,
441                 name   TEXT    NOT NULL,
442                 count  INTEGER NOT NULL DEFAULT 0
443             ) STRICT;
444
445             CREATE INDEX IF NOT EXISTS type_by_name
446                 ON type_dict(kind, name);
447
448             CREATE TABLE IF NOT EXISTS graph_stat (
449                 key    TEXT NOT NULL PRIMARY KEY,
450                 value  INTEGER NOT NULL
451             ) STRICT, WITHOUT ROWID;
452
453             CREATE TABLE IF NOT EXISTS hub_degree (
454                 entity_id INTEGER PRIMARY KEY,
455                 out_deg   INTEGER NOT NULL,
456                 in_deg    INTEGER NOT NULL
457             ) STRICT;
458
459             CREATE TABLE IF NOT EXISTS partition_map (
460                 table_name TEXT NOT NULL PRIMARY KEY,
461                 role       INTEGER NOT NULL,
462                 type_id    INTEGER,
463                 row_count  INTEGER NOT NULL DEFAULT 0
464             ) STRICT, WITHOUT ROWID;",
465        )
466        .map_err(sqlite_err)?;
467
468        conn.execute_batch(&format!("PRAGMA mmap_size = {};", tuning.mmap_size))
469            .map_err(sqlite_err)?;
470
471        let sync_pragma = match durability {
472            Durability::Sync => "PRAGMA synchronous = FULL",
473            Durability::Async => "PRAGMA synchronous = NORMAL",
474        };
475        conn.execute_batch(sync_pragma).map_err(sqlite_err)?;
476
477        // Bound the cost of `PRAGMA optimize` (here and in maintenance) so a
478        // large database cannot stall startup/maintenance analyzing every index.
479        conn.execute_batch("PRAGMA analysis_limit = 400;")
480            .map_err(sqlite_err)?;
481
482        let has_stat: bool = conn
483            .query_row("SELECT 1 FROM graph_stat LIMIT 1", [], |_| Ok(()))
484            .is_ok();
485        if !has_stat {
486            conn.execute_batch(
487                "INSERT INTO graph_stat(key, value) VALUES
488                 ('entities', 0), ('relations', 0), ('observations', 0),
489                 ('entity_seq', 0), ('obs_seq', 0);",
490            )
491            .map_err(sqlite_err)?;
492        }
493
494        conn.execute_batch("PRAGMA optimize;").map_err(sqlite_err)?;
495
496        let seq_entity = read_graph_stat(&conn, "entity_seq").unwrap_or(0);
497        let seq_obs = read_graph_stat(&conn, "obs_seq").unwrap_or(0);
498
499        // Open the reader pool against the now-initialized database. At least one
500        // reader is always created.
501        let pool_size = read_pool_size.max(1);
502        let mut conns = Vec::with_capacity(pool_size);
503        for _ in 0..pool_size {
504            conns.push(Mutex::new(open_reader(path, &tuning)?));
505        }
506        let readers = ReaderPool {
507            conns,
508            next: AtomicUsize::new(0),
509        };
510
511        Ok(Self {
512            writer: Mutex::new(conn),
513            readers,
514            seq_entity: AtomicI64::new(seq_entity),
515            seq_obs: AtomicI64::new(seq_obs),
516            cache: Mutex::new(LruCache::new(lru_cache_size)),
517        })
518    }
519
520    fn next_entity_id(&self) -> i64 {
521        self.seq_entity.fetch_add(1, Ordering::Relaxed) + 1
522    }
523
524    fn next_obs_id(&self) -> i64 {
525        self.seq_obs.fetch_add(1, Ordering::Relaxed) + 1
526    }
527
528    fn meta_get(&self, name: &str) -> Option<EntityMeta> {
529        self.cache.lock().get(name).copied()
530    }
531
532    fn meta_set(&self, name: &str, m: EntityMeta) {
533        self.cache.lock().put(name.to_string(), m);
534    }
535
536    fn meta_remove(&self, name: &str) {
537        self.cache.lock().pop(name);
538    }
539
540    fn meta_update(&self, name: &str, f: impl FnOnce(&mut EntityMeta)) {
541        let mut cache = self.cache.lock();
542        if let Some(m) = cache.get_mut(name) {
543            f(m);
544        }
545    }
546
547    fn get_entity_id(&self, conn: &Connection, name: &str) -> Result<Option<(i64, i64, i64, i64)>> {
548        if let Some(m) = self.meta_get(name) {
549            return Ok(Some((m.id, m.type_id, m.out_deg, m.in_deg)));
550        }
551        let h = name_hash(name);
552        let mut stmt = conn
553            .prepare_cached(
554                "SELECT id, type_id, obs_count, out_deg, in_deg
555                 FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
556            )
557            .map_err(sqlite_err)?;
558        match stmt.query_row(params![h, name], |row| {
559            Ok(EntityMeta {
560                id: row.get(0)?,
561                type_id: row.get(1)?,
562                obs_count: row.get(2)?,
563                out_deg: row.get(3)?,
564                in_deg: row.get(4)?,
565            })
566        }) {
567            Ok(m) => {
568                self.meta_set(name, m);
569                Ok(Some((m.id, m.type_id, m.out_deg, m.in_deg)))
570            }
571            Err(e) if is_not_found(&e) => Ok(None),
572            Err(e) => Err(sqlite_err(e)),
573        }
574    }
575
576    fn sync_seqs(&self, conn: &Connection) -> Result<()> {
577        let seq_e = self.seq_entity.load(Ordering::Relaxed);
578        let seq_o = self.seq_obs.load(Ordering::Relaxed);
579        conn.execute(
580            "UPDATE graph_stat SET value = CASE key WHEN 'entity_seq' THEN ?1 WHEN 'obs_seq' THEN ?2 ELSE value END
581             WHERE key IN ('entity_seq', 'obs_seq')",
582            params![seq_e, seq_o],
583        )
584        .map_err(sqlite_err)?;
585        Ok(())
586    }
587
588    // ── Public API ──────────────────────────────────────────────────────
589
590    pub fn get_entity(&self, name: &str) -> Result<Option<Entity>> {
591        if name.is_empty() {
592            return Ok(None);
593        }
594
595        if let Some(m) = self.meta_get(name) {
596            let conn = self.readers.get();
597            let etype = name_of_type(&conn, m.type_id).unwrap_or_default();
598            let observations = load_observations_opt(&conn, m.id);
599            return Ok(Some(Entity {
600                name: name.to_string(),
601                entity_type: etype,
602                observations,
603            }));
604        }
605
606        let conn = self.readers.get();
607        let h = name_hash(name);
608        let mut stmt = conn
609            .prepare_cached(
610                "SELECT e.id, e.type_id, e.name, t.name,
611                        e.obs_count, e.out_deg, e.in_deg
612                 FROM entity e
613                 JOIN type_dict t ON t.id = e.type_id
614                 WHERE e.name_hash = ?1 AND e.name = ?2 AND e.flags = 0",
615            )
616            .map_err(sqlite_err)?;
617        match stmt.query_row(params![h, name], |row| {
618            let id: i64 = row.get(0)?;
619            let type_id: i64 = row.get(1)?;
620            let ename: String = row.get(2)?;
621            let etype: String = row.get(3)?;
622            let obs_count: i64 = row.get(4)?;
623            let out_deg: i64 = row.get(5)?;
624            let in_deg: i64 = row.get(6)?;
625            Ok((id, type_id, ename, etype, obs_count, out_deg, in_deg))
626        }) {
627            Ok((id, type_id, ename, etype, obs_count, out_deg, in_deg)) => {
628                let observations = load_observations_opt(&conn, id);
629                drop(stmt);
630                drop(conn);
631                self.meta_set(&ename, EntityMeta { id, type_id, obs_count, out_deg, in_deg });
632                Ok(Some(Entity {
633                    name: ename,
634                    entity_type: etype,
635                    observations,
636                }))
637            }
638            Err(e) if is_not_found(&e) => Ok(None),
639            Err(e) => Err(sqlite_err(e)),
640        }
641    }
642
643    pub fn create_entities(&self, entities: &[Entity]) -> Result<Vec<Entity>> {
644        let conn = self.writer.lock();
645        let tx = TxGuard::begin(&conn)?;
646
647        let mut ins_ent = conn
648            .prepare_cached(
649                "INSERT INTO entity (id, name_hash, name, type_id, obs_count, out_deg, in_deg, created_us, updated_us, flags)
650                 SELECT ?1, ?2, ?3, ?4, ?5, 0, 0, ?6, ?6, 0
651                 WHERE NOT EXISTS (SELECT 1 FROM entity WHERE name_hash = ?2 AND name = ?3 AND flags = 0)",
652            )
653            .map_err(sqlite_err)?;
654
655        let mut ins_fts = conn
656            .prepare_cached("INSERT INTO name_fts (rowid, name) VALUES (?1, ?2)")
657            .map_err(sqlite_err)?;
658
659        let batch_ts = now_us();
660        let mut type_cache: FxHashMap<String, i64> = FxHashMap::default();
661        let mut type_deltas: FxHashMap<i64, i64> = FxHashMap::default();
662        let mut total_entities: i64 = 0;
663        let mut total_obs: i64 = 0;
664        let mut created = Vec::new();
665        let mut created_metas: Vec<(String, EntityMeta)> = Vec::new();
666        let mut obs_sql = String::new();
667
668        for entity in entities {
669            if entity.name.is_empty() {
670                continue;
671            }
672            let h = name_hash(&entity.name);
673            let id = self.next_entity_id();
674            let type_id = match type_cache.get(entity.entity_type.as_str()) {
675                Some(t) => *t,
676                None => {
677                    let t = get_type_id(&conn, &entity.entity_type, 0)?;
678                    type_cache.insert(entity.entity_type.clone(), t);
679                    t
680                }
681            };
682            let obs_count = entity.observations.len() as i64;
683
684            let changed = ins_ent
685                .execute(params![id, h, entity.name, type_id, obs_count, batch_ts])
686                .map_err(sqlite_err)?;
687            if changed == 0 {
688                continue;
689            }
690
691            let n = entity.observations.len();
692            if n > 0 {
693                obs_sql.clear();
694
695                let mut oids = Vec::with_capacity(n);
696                let mut idxs = Vec::with_capacity(n);
697                for _ in 0..n {
698                    oids.push(self.next_obs_id());
699                }
700                for i in 0..n as i64 {
701                    idxs.push(i);
702                }
703
704                obs_sql.push_str("INSERT INTO observation (id,entity_id,idx,body,created_us) VALUES");
705                for i in 0..n {
706                    if i > 0 { obs_sql.push(','); }
707                    obs_sql.push_str("(?,?,?,?,?)");
708                }
709
710                let mut obs_params: Vec<&dyn ToSql> = Vec::with_capacity(n * 5);
711                for (i, obs) in entity.observations.iter().enumerate() {
712                    obs_params.push(&oids[i]);
713                    obs_params.push(&id);
714                    obs_params.push(&idxs[i]);
715                    obs_params.push(obs);
716                    obs_params.push(&batch_ts);
717                }
718
719                conn.execute(&obs_sql, obs_params.as_slice())
720                    .map_err(sqlite_err)?;
721            }
722
723            ins_fts
724                .execute(params![id, entity.name])
725                .map_err(sqlite_err)?;
726
727            *type_deltas.entry(type_id).or_insert(0) += 1;
728            total_entities += 1;
729            total_obs += obs_count;
730
731            created.push(entity.clone());
732            created_metas.push((entity.name.clone(), EntityMeta {
733                id,
734                type_id,
735                obs_count,
736                out_deg: 0,
737                in_deg: 0,
738            }));
739        }
740
741        if total_entities > 0 {
742            for (type_id, delta) in &type_deltas {
743                inc_type_count(&conn, *type_id, *delta)?;
744            }
745            inc_graph_stat(&conn, "entities", total_entities)?;
746            inc_graph_stat(&conn, "observations", total_obs)?;
747            self.sync_seqs(&conn)?;
748        }
749
750        tx.commit()?;
751
752        // Note: `PRAGMA optimize` is intentionally *not* run here. It analyzes
753        // indexes and writes to internal stat tables — pure overhead on the
754        // write hot path. The periodic `run_maintenance` task covers it.
755
756        if !created_metas.is_empty() {
757            let mut cache = self.cache.lock();
758            for (name, meta) in &created_metas {
759                cache.put(name.clone(), *meta);
760            }
761        }
762
763        Ok(created)
764    }
765
766    pub fn delete_entities(&self, names: &[String]) -> Result<()> {
767        if names.is_empty() {
768            return Ok(());
769        }
770        let conn = self.writer.lock();
771
772        // Phase 1: Resolve all names to (id, type_id).
773        let mut resolved: Vec<(i64, i64, String)> = Vec::with_capacity(names.len());
774        let mut sel = conn
775            .prepare_cached(
776                "SELECT id, type_id FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
777            )
778            .map_err(sqlite_err)?;
779        for name in names {
780            let h = name_hash(name);
781            let (id, type_id) = match sel.query_row(params![h, name], |row| {
782                Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?))
783            }) {
784                Ok(v) => v,
785                Err(e) if is_not_found(&e) => continue,
786                Err(e) => return Err(sqlite_err(e)),
787            };
788            resolved.push((id, type_id, name.clone()));
789        }
790
791        if resolved.is_empty() {
792            return Ok(());
793        }
794
795        let ids: Vec<i64> = resolved.iter().map(|(id, _, _)| *id).collect();
796        let n = ids.len();
797
798        // Phase 2: Batch DELETE observations.
799        let obs_p: Vec<String> = (0..n).map(|i| format!("?{}", i + 1)).collect();
800        let obs_sql = format!(
801            "DELETE FROM observation WHERE entity_id IN ({})",
802            obs_p.join(",")
803        );
804        let obs_refs: Vec<&dyn ToSql> = ids.iter().map(|id| id as &dyn ToSql).collect();
805        let obs_deleted = conn
806            .execute(&obs_sql, obs_refs.as_slice())
807            .map_err(sqlite_err)? as i64;
808
809        // Phase 3: Batch DELETE relations.
810        let rel_sql = format!(
811            "DELETE FROM relation WHERE from_id IN ({}) OR to_id IN ({})",
812            obs_p.join(","),
813            obs_p.join(",")
814        );
815        let rel_refs: Vec<&dyn ToSql> = ids.iter().map(|id| id as &dyn ToSql).collect();
816        let rel_deleted = conn
817            .execute(&rel_sql, rel_refs.as_slice())
818            .map_err(sqlite_err)? as i64;
819
820        // Phase 4: Batch FTS deletes.
821        let fts_values: Vec<String> = (0..n)
822            .map(|_| "('delete', ?, '')".to_string())
823            .collect();
824        let fts_sql = format!(
825            "INSERT INTO name_fts(name_fts, rowid, name) VALUES {}",
826            fts_values.join(", ")
827        );
828        conn.execute(&fts_sql, rusqlite::params_from_iter(&ids))
829            .map_err(sqlite_err)?;
830
831        // Aggregate type count deltas.
832        let mut type_deltas: FxHashMap<i64, i64> = FxHashMap::default();
833        for &(_, type_id, _) in &resolved {
834            *type_deltas.entry(type_id).or_insert(0) += 1;
835        }
836
837        // Phase 5: Batch type count decrements.
838        if !type_deltas.is_empty() {
839            let m = type_deltas.len();
840            let type_keys: Vec<i64> = type_deltas.keys().cloned().collect();
841            let type_vals: Vec<i64> = type_deltas.values().map(|v| -*v).collect();
842            let mut case_parts: Vec<String> = Vec::with_capacity(m);
843            let mut id_parts: Vec<String> = Vec::with_capacity(m);
844            for i in 0..m {
845                case_parts.push(format!("WHEN ?{} THEN ?{}", i + 1, m + i + 1));
846                id_parts.push(format!("?{}", i + 1));
847            }
848            let sql = format!(
849                "UPDATE type_dict SET count = MAX(0, count + CASE id {} ELSE 0 END) WHERE id IN ({})",
850                case_parts.join(" "),
851                id_parts.join(","),
852            );
853            let mut params: Vec<Box<dyn ToSql>> = Vec::with_capacity(2 * m);
854            for id in &type_keys {
855                params.push(Box::new(*id));
856            }
857            for delta in &type_vals {
858                params.push(Box::new(*delta));
859            }
860            let param_refs: Vec<&dyn ToSql> = params.iter().map(|p| p.as_ref()).collect();
861            conn.execute(&sql, param_refs.as_slice()).map_err(sqlite_err)?;
862        }
863
864        // Phase 6: Batch DELETE entities.
865        conn.execute(
866            &format!("DELETE FROM entity WHERE id IN ({})", obs_p.join(",")),
867            ids.iter().map(|id| id as &dyn ToSql).collect::<Vec<_>>().as_slice(),
868        )
869        .map_err(sqlite_err)?;
870
871        // Phase 7: Update stats.
872        inc_graph_stat(&conn, "entities", -(n as i64))?;
873        inc_graph_stat(&conn, "observations", -obs_deleted)?;
874        inc_graph_stat(&conn, "relations", -rel_deleted)?;
875
876        // Phase 8: Remove from cache.
877        for (_, _, name) in &resolved {
878            self.meta_remove(name);
879        }
880
881        Ok(())
882    }
883
884    /// Remove a `code:file` entity and every symbol it `defines`, so the file
885    /// can be re-indexed cleanly. Relations touching the removed entities are
886    /// cascaded by [`Self::delete_entities`]. Returns the number of entities
887    /// removed (file + symbols).
888    #[cfg(feature = "code")]
889    pub fn code_purge_file(&self, rel_path: &str) -> Result<usize> {
890        let defines = self.search_relations(Some(rel_path), None, Some("defines"));
891        let mut names: Vec<String> = defines.into_iter().map(|r| r.to).collect();
892        names.push(rel_path.to_string());
893        let n = names.len();
894        self.delete_entities(&names)?;
895        Ok(n)
896    }
897
898    pub fn create_relations(&self, relations: &[Relation]) -> Result<Vec<Relation>> {
899        let conn = self.writer.lock();
900        let tx = TxGuard::begin(&conn)?;
901
902        let mut ins = conn
903            .prepare_cached(
904                "INSERT INTO relation (from_id, to_id, type_id, created_us)
905                 SELECT ?1, ?2, ?3, ?4
906                 WHERE NOT EXISTS (SELECT 1 FROM relation WHERE from_id = ?1 AND to_id = ?2 AND type_id = ?3)",
907            )
908            .map_err(sqlite_err)?;
909
910        let ts = now_us();
911        let mut type_cache: FxHashMap<String, i64> = FxHashMap::default();
912        let mut type_deltas: FxHashMap<i64, i64> = FxHashMap::default();
913        let mut out_deltas: FxHashMap<i64, i64> = FxHashMap::default();
914        let mut in_deltas: FxHashMap<i64, i64> = FxHashMap::default();
915        let mut total_relations: i64 = 0;
916        let mut created = Vec::new();
917
918        for rel in relations {
919            let (from_id, _, _, _) = match self.get_entity_id(&conn, &rel.from)? {
920                Some(v) => v,
921                None => continue,
922            };
923            let (to_id, _, _, _) = match self.get_entity_id(&conn, &rel.to)? {
924                Some(v) => v,
925                None => continue,
926            };
927            let type_id = match type_cache.get(rel.relation_type.as_str()) {
928                Some(t) => *t,
929                None => {
930                    let t = get_type_id(&conn, &rel.relation_type, 1)?;
931                    type_cache.insert(rel.relation_type.clone(), t);
932                    t
933                }
934            };
935
936            let changed = ins
937                .execute(params![from_id, to_id, type_id, ts])
938                .map_err(sqlite_err)?;
939            if changed == 0 {
940                continue;
941            }
942
943            *out_deltas.entry(from_id).or_insert(0) += 1;
944            *in_deltas.entry(to_id).or_insert(0) += 1;
945            *type_deltas.entry(type_id).or_insert(0) += 1;
946            total_relations += 1;
947
948            created.push(rel.clone());
949        }
950
951        if total_relations > 0 {
952            for (id, delta) in &out_deltas {
953                conn.execute(
954                    "UPDATE entity SET out_deg = out_deg + ?1 WHERE id = ?2",
955                    params![delta, id],
956                )
957                .map_err(sqlite_err)?;
958            }
959            for (id, delta) in in_deltas {
960                conn.execute(
961                    "UPDATE entity SET in_deg = in_deg + ?1 WHERE id = ?2",
962                    params![delta, id],
963                )
964                .map_err(sqlite_err)?;
965            }
966            for (type_id, delta) in &type_deltas {
967                inc_type_count(&conn, *type_id, *delta)?;
968            }
969            inc_graph_stat(&conn, "relations", total_relations)?;
970        }
971
972        tx.commit()?;
973
974        // See `create_entities`: `PRAGMA optimize` is deferred to maintenance.
975
976        if !created.is_empty() {
977            let mut cache = self.cache.lock();
978            for rel in &created {
979                if let Some(m) = cache.get_mut(&rel.from) {
980                    m.out_deg += 1;
981                }
982                if let Some(m) = cache.get_mut(&rel.to) {
983                    m.in_deg += 1;
984                }
985            }
986        }
987
988        Ok(created)
989    }
990
991    pub fn delete_relations(&self, relations: &[Relation]) -> Result<()> {
992        if relations.is_empty() {
993            return Ok(());
994        }
995        let conn = self.writer.lock();
996
997        // Resolve names to IDs and collect valid triples.
998        let mut triples: Vec<(i64, i64, i64)> = Vec::with_capacity(relations.len());
999        let mut names: Vec<(String, String)> = Vec::with_capacity(relations.len());
1000        for rel in relations {
1001            let (from_id, _, _, _) = match self.get_entity_id(&conn, &rel.from)? {
1002                Some(v) => v,
1003                None => continue,
1004            };
1005            let (to_id, _, _, _) = match self.get_entity_id(&conn, &rel.to)? {
1006                Some(v) => v,
1007                None => continue,
1008            };
1009            let type_id = match get_type_id(&conn, &rel.relation_type, 1) {
1010                Ok(id) => id,
1011                Err(_) => continue,
1012            };
1013            triples.push((from_id, to_id, type_id));
1014            names.push((rel.from.clone(), rel.to.clone()));
1015        }
1016
1017        if triples.is_empty() {
1018            return Ok(());
1019        }
1020
1021        // Batch DELETE using VALUES subquery.
1022        let mut sql = String::from(
1023            "DELETE FROM relation WHERE (from_id, to_id, type_id) IN (",
1024        );
1025        for (i, _) in triples.iter().enumerate() {
1026            if i > 0 {
1027                sql.push_str(", ");
1028            }
1029            let base = (i * 3) + 1;
1030            sql.push_str(&format!("SELECT ?{b}, ?{bp1}, ?{bp2}", b = base, bp1 = base + 1, bp2 = base + 2));
1031        }
1032        sql.push(')');
1033
1034        let mut param_values: Vec<Box<dyn ToSql>> = Vec::with_capacity(triples.len() * 3);
1035        for &(f, t, tp) in &triples {
1036            param_values.push(Box::new(f));
1037            param_values.push(Box::new(t));
1038            param_values.push(Box::new(tp));
1039        }
1040        let param_refs: Vec<&dyn ToSql> = param_values.iter().map(|p| p.as_ref()).collect();
1041        let total = conn.execute(&sql, param_refs.as_slice()).map_err(sqlite_err)?;
1042        if total == 0 {
1043            return Ok(());
1044        }
1045
1046        // Aggregate degree and type deltas.
1047        let mut out_deltas: FxHashMap<i64, i64> = FxHashMap::default();
1048        let mut in_deltas: FxHashMap<i64, i64> = FxHashMap::default();
1049        let mut type_deltas: FxHashMap<i64, i64> = FxHashMap::default();
1050        for &(from_id, to_id, type_id) in &triples {
1051            *out_deltas.entry(from_id).or_insert(0) += 1;
1052            *in_deltas.entry(to_id).or_insert(0) += 1;
1053            *type_deltas.entry(type_id).or_insert(0) += 1;
1054        }
1055
1056        // Batch out_deg updates.
1057        let out_keys: Vec<i64> = out_deltas.keys().cloned().collect();
1058        let out_vals: Vec<i64> = out_deltas.values().cloned().collect();
1059        if !out_keys.is_empty() {
1060            let m = out_keys.len();
1061            let mut case_parts: Vec<String> = Vec::with_capacity(m);
1062            let mut id_parts: Vec<String> = Vec::with_capacity(m);
1063            for i in 0..m {
1064                case_parts.push(format!("WHEN ?{} THEN ?{}", i + 1, m + i + 1));
1065                id_parts.push(format!("?{}", i + 1));
1066            }
1067            let sql = format!(
1068                "UPDATE entity SET out_deg = MAX(0, out_deg - CASE id {} ELSE 0 END) WHERE id IN ({})",
1069                case_parts.join(" "),
1070                id_parts.join(","),
1071            );
1072            let mut params: Vec<Box<dyn ToSql>> = Vec::with_capacity(2 * m);
1073            for id in &out_keys {
1074                params.push(Box::new(*id));
1075            }
1076            for delta in &out_vals {
1077                params.push(Box::new(*delta));
1078            }
1079            let param_refs: Vec<&dyn ToSql> = params.iter().map(|p| p.as_ref()).collect();
1080            conn.execute(&sql, param_refs.as_slice()).map_err(sqlite_err)?;
1081        }
1082
1083        // Batch in_deg updates.
1084        let in_keys: Vec<i64> = in_deltas.keys().cloned().collect();
1085        let in_vals: Vec<i64> = in_deltas.values().cloned().collect();
1086        if !in_keys.is_empty() {
1087            let m = in_keys.len();
1088            let mut case_parts: Vec<String> = Vec::with_capacity(m);
1089            let mut id_parts: Vec<String> = Vec::with_capacity(m);
1090            for i in 0..m {
1091                case_parts.push(format!("WHEN ?{} THEN ?{}", i + 1, m + i + 1));
1092                id_parts.push(format!("?{}", i + 1));
1093            }
1094            let sql = format!(
1095                "UPDATE entity SET in_deg = MAX(0, in_deg - CASE id {} ELSE 0 END) WHERE id IN ({})",
1096                case_parts.join(" "),
1097                id_parts.join(","),
1098            );
1099            let mut params: Vec<Box<dyn ToSql>> = Vec::with_capacity(2 * m);
1100            for id in &in_keys {
1101                params.push(Box::new(*id));
1102            }
1103            for delta in &in_vals {
1104                params.push(Box::new(*delta));
1105            }
1106            let param_refs: Vec<&dyn ToSql> = params.iter().map(|p| p.as_ref()).collect();
1107            conn.execute(&sql, param_refs.as_slice()).map_err(sqlite_err)?;
1108        }
1109
1110        // Batch type_dict updates.
1111        let type_keys: Vec<i64> = type_deltas.keys().cloned().collect();
1112        let type_vals: Vec<i64> = type_deltas.values().cloned().collect();
1113        if !type_keys.is_empty() {
1114            let m = type_keys.len();
1115            let mut case_parts: Vec<String> = Vec::with_capacity(m);
1116            let mut id_parts: Vec<String> = Vec::with_capacity(m);
1117            for i in 0..m {
1118                case_parts.push(format!("WHEN ?{} THEN ?{}", i + 1, m + i + 1));
1119                id_parts.push(format!("?{}", i + 1));
1120            }
1121            let sql = format!(
1122                "UPDATE type_dict SET count = MAX(0, count - CASE id {} ELSE 0 END) WHERE id IN ({})",
1123                case_parts.join(" "),
1124                id_parts.join(","),
1125            );
1126            let mut params: Vec<Box<dyn ToSql>> = Vec::with_capacity(2 * m);
1127            for id in &type_keys {
1128                params.push(Box::new(*id));
1129            }
1130            for delta in &type_vals {
1131                params.push(Box::new(*delta));
1132            }
1133            let param_refs: Vec<&dyn ToSql> = params.iter().map(|p| p.as_ref()).collect();
1134            conn.execute(&sql, param_refs.as_slice()).map_err(sqlite_err)?;
1135        }
1136
1137        inc_graph_stat(&conn, "relations", -(total as i64))?;
1138
1139        // Update cache for resolved triples (self-heals on next reload if
1140        // a triple happened to not match).
1141        for (from, to) in &names {
1142            self.meta_update(from, |m| m.out_deg = m.out_deg.saturating_sub(1));
1143            self.meta_update(to, |m| m.in_deg = m.in_deg.saturating_sub(1));
1144        }
1145
1146        Ok(())
1147    }
1148
1149    pub fn add_observations(&self, entity_name: &str, contents: &[String]) -> Result<Vec<String>> {
1150        let conn = self.writer.lock();
1151        let (id, _type_id, _, _) = match self.get_entity_id(&conn, entity_name)? {
1152            Some(v) => v,
1153            None => {
1154                return Err(MCSError::InvalidParams(format!(
1155                    "Entity '{entity_name}' not found"
1156                )))
1157            }
1158        };
1159
1160        let mut max_idx: i64 = conn
1161            .query_row(
1162                "SELECT COALESCE(MAX(idx), -1) FROM observation WHERE entity_id = ?1",
1163                params![id],
1164                |row| row.get(0),
1165            )
1166            .map_err(sqlite_err)?;
1167
1168        let ts = now_us();
1169        let mut ins_obs = conn
1170            .prepare_cached(
1171                "INSERT INTO observation (id, entity_id, idx, body, created_us) VALUES (?1, ?2, ?3, ?4, ?5)",
1172            )
1173            .map_err(sqlite_err)?;
1174
1175        for content in contents {
1176            max_idx += 1;
1177            let oid = self.next_obs_id();
1178            ins_obs
1179                .execute(params![oid, id, max_idx, content, ts])
1180                .map_err(sqlite_err)?;
1181        }
1182        let added = contents.to_vec();
1183
1184        let count: i64 = contents.len() as i64;
1185        conn.execute(
1186            "UPDATE entity SET obs_count = obs_count + ?1, updated_us = ?2 WHERE id = ?3",
1187            params![count, ts, id],
1188        )
1189        .map_err(sqlite_err)?;
1190
1191        inc_graph_stat(&conn, "observations", count)?;
1192        self.sync_seqs(&conn)?;
1193
1194        self.meta_update(entity_name, |m| m.obs_count += count);
1195
1196        Ok(added)
1197    }
1198
1199    pub fn delete_observations(&self, entity_name: &str, observations: &[String]) -> Result<()> {
1200        if observations.is_empty() {
1201            return Ok(());
1202        }
1203        let conn = self.writer.lock();
1204        let (id, _, _, _) = match self.get_entity_id(&conn, entity_name)? {
1205            Some(v) => v,
1206            None => {
1207                return Err(MCSError::InvalidParams(format!(
1208                    "Entity '{entity_name}' not found"
1209                )))
1210            }
1211        };
1212
1213        let placeholders: Vec<String> = (0..observations.len())
1214            .map(|i| format!("?{}", i + 2))
1215            .collect();
1216        let sql = format!(
1217            "DELETE FROM observation WHERE entity_id = ?1 AND body IN ({})",
1218            placeholders.join(",")
1219        );
1220
1221        let mut param_values: Vec<Box<dyn ToSql>> = Vec::with_capacity(1 + observations.len());
1222        param_values.push(Box::new(id));
1223        for obs in observations {
1224            param_values.push(Box::new(obs.as_str()));
1225        }
1226        let param_refs: Vec<&dyn ToSql> = param_values.iter().map(|p| p.as_ref()).collect();
1227        let removed = conn.execute(&sql, param_refs.as_slice()).map_err(sqlite_err)? as i64;
1228
1229        if removed > 0 {
1230            conn.execute(
1231                "UPDATE entity SET obs_count = MAX(0, obs_count - ?1), updated_us = ?2 WHERE id = ?3",
1232                params![removed, now_us(), id],
1233            )
1234            .map_err(sqlite_err)?;
1235            inc_graph_stat(&conn, "observations", -removed)?;
1236
1237            self.meta_update(entity_name, |m| m.obs_count = m.obs_count.saturating_sub(removed));
1238        }
1239
1240        Ok(())
1241    }
1242
1243    pub fn upsert_entities(&self, entities: &[Entity]) -> Result<Vec<Entity>> {
1244        let mut results = Vec::new();
1245        for entity in entities {
1246            if let Some(existing) = self.get_entity(&entity.name)? {
1247                // Update type if different.
1248                if existing.entity_type != entity.entity_type {
1249                    let conn = self.writer.lock();
1250                    let old_type_id = conn
1251                        .query_row(
1252                            "SELECT type_id FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
1253                            params![name_hash(&entity.name), entity.name],
1254                            |row| row.get::<_, i64>(0),
1255                        )
1256                        .map_err(sqlite_err)?;
1257                    let new_type_id = get_type_id(&conn, &entity.entity_type, 0)?;
1258                    inc_type_count(&conn, old_type_id, -1)?;
1259                    inc_type_count(&conn, new_type_id, 1)?;
1260                    conn.execute(
1261                        "UPDATE entity SET type_id = ?1, updated_us = ?2 WHERE name_hash = ?3 AND name = ?4",
1262                        params![new_type_id, now_us(), name_hash(&entity.name), entity.name],
1263                    )
1264                    .map_err(sqlite_err)?;
1265                    // Invalidate cache so subsequent get_entity reloads meta.
1266                    self.meta_remove(&entity.name);
1267                }
1268                // Merge observations (append new ones not already present).
1269                let existing_set: HashSet<&str> =
1270                    existing.observations.iter().map(|s| s.as_str()).collect();
1271                let to_add: Vec<String> = entity
1272                    .observations
1273                    .iter()
1274                    .filter(|o| !existing_set.contains(o.as_str()))
1275                    .cloned()
1276                    .collect();
1277                if !to_add.is_empty() {
1278                    self.add_observations(&entity.name, &to_add)?;
1279                }
1280                let updated = self
1281                    .get_entity(&entity.name)?
1282                    .unwrap_or(entity.clone());
1283                results.push(updated);
1284            } else {
1285                let c = self.create_entities(std::slice::from_ref(entity))?;
1286                if let Some(e) = c.into_iter().next() {
1287                    results.push(e);
1288                }
1289            }
1290        }
1291        Ok(results)
1292    }
1293
1294    pub fn merge_entities(&self, source: &str, target: &str) -> Result<Entity> {
1295        let conn = self.writer.lock();
1296        let (src_id, _, _, _) = match self.get_entity_id(&conn, source)? {
1297            Some(v) => v,
1298            None => {
1299                return Err(MCSError::InvalidParams(format!(
1300                    "Source entity '{source}' not found"
1301                )))
1302            }
1303        };
1304        let (tgt_id, _, _, _) = match self.get_entity_id(&conn, target)? {
1305            Some(v) => v,
1306            None => {
1307                return Err(MCSError::InvalidParams(format!(
1308                    "Target entity '{target}' not found"
1309                )))
1310            }
1311        };
1312
1313        if src_id == tgt_id {
1314            return self.get_entity(target)?.ok_or_else(|| {
1315                MCSError::InvalidParams("Target entity not found after merge".into())
1316            });
1317        }
1318
1319        // Move observations from source to target.
1320        let mut obs_count: i64 = 0;
1321        {
1322            let mut max_idx: i64 = conn
1323                .query_row(
1324                    "SELECT COALESCE(MAX(idx), -1) FROM observation WHERE entity_id = ?1",
1325                    params![tgt_id],
1326                    |row| row.get(0),
1327                )
1328                .map_err(sqlite_err)?;
1329            let mut sel_obs = conn
1330                .prepare_cached(
1331                    "SELECT id, body FROM observation WHERE entity_id = ?1 ORDER BY idx",
1332                )
1333                .map_err(sqlite_err)?;
1334            let mut upd_obs = conn
1335                .prepare_cached("UPDATE observation SET entity_id = ?1, idx = ?2 WHERE id = ?3")
1336                .map_err(sqlite_err)?;
1337            let rows: Vec<(i64, String)> = sel_obs
1338                .query_map(params![src_id], |row| {
1339                    Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?))
1340                })
1341                .map_err(sqlite_err)?
1342                .filter_map(|r| r.ok())
1343                .collect();
1344            for (oid, _body) in &rows {
1345                max_idx += 1;
1346                upd_obs
1347                    .execute(params![tgt_id, max_idx, oid])
1348                    .map_err(sqlite_err)?;
1349                obs_count += 1;
1350            }
1351        }
1352
1353        // Move relations from source to target.
1354        conn.execute(
1355            "UPDATE OR IGNORE relation SET from_id = ?1 WHERE from_id = ?2",
1356            params![tgt_id, src_id],
1357        )
1358        .map_err(sqlite_err)?;
1359        conn.execute(
1360            "UPDATE OR IGNORE relation SET to_id = ?1 WHERE to_id = ?2",
1361            params![tgt_id, src_id],
1362        )
1363        .map_err(sqlite_err)?;
1364        // Delete orphaned relations that were updated by the above (the "OR IGNORE"
1365        // keeps the first, but we still have the original row with the old id? No —
1366        // UPDATE OR IGNORE won't remove. So we must delete any that still reference src_id.)
1367        conn.execute("DELETE FROM relation WHERE from_id = ?1", params![src_id])
1368            .map_err(sqlite_err)?;
1369        conn.execute("DELETE FROM relation WHERE to_id = ?1", params![src_id])
1370            .map_err(sqlite_err)?;
1371
1372        // Update degrees on target.
1373        let out_add: i64 = conn
1374            .query_row(
1375                "SELECT COUNT(*) FROM relation WHERE from_id = ?1",
1376                params![tgt_id],
1377                |row| row.get(0),
1378            )
1379            .map_err(sqlite_err)?;
1380        let in_add: i64 = conn
1381            .query_row(
1382                "SELECT COUNT(*) FROM relation WHERE to_id = ?1",
1383                params![tgt_id],
1384                |row| row.get(0),
1385            )
1386            .map_err(sqlite_err)?;
1387        conn.execute(
1388            "UPDATE entity SET out_deg = ?1, in_deg = ?2, obs_count = obs_count + ?3, updated_us = ?4 WHERE id = ?5",
1389            params![out_add, in_add, obs_count, now_us(), tgt_id],
1390        )
1391        .map_err(sqlite_err)?;
1392
1393        // Delete source entity.
1394        conn.execute(
1395            "INSERT INTO name_fts(name_fts, rowid, name) VALUES ('delete', ?1, '')",
1396            params![src_id],
1397        )
1398        .map_err(sqlite_err)?;
1399        conn.execute("DELETE FROM entity WHERE id = ?1", params![src_id])
1400            .map_err(sqlite_err)?;
1401
1402        inc_graph_stat(&conn, "entities", -1)?;
1403        self.meta_remove(source);
1404
1405        // Reload target into cache.
1406        if let Ok(meta) = conn.query_row(
1407            "SELECT id, type_id, obs_count, out_deg, in_deg FROM entity WHERE id = ?1",
1408            params![tgt_id],
1409            |row| {
1410                Ok(EntityMeta {
1411                    id: row.get(0)?,
1412                    type_id: row.get(1)?,
1413                    obs_count: row.get(2)?,
1414                    out_deg: row.get(3)?,
1415                    in_deg: row.get(4)?,
1416                })
1417            },
1418        ) {
1419            self.meta_set(target, meta);
1420        }
1421
1422        let (name, etype): (String, String) = conn
1423            .query_row(
1424                "SELECT e.name, t.name FROM entity e JOIN type_dict t ON t.id = e.type_id WHERE e.id = ?1",
1425                params![tgt_id],
1426                |row| Ok((row.get(0)?, row.get(1)?)),
1427            )
1428            .map_err(sqlite_err)?;
1429        let observations = load_observations_opt(&conn, tgt_id);
1430
1431        Ok(Entity {
1432            name,
1433            entity_type: etype,
1434            observations,
1435        })
1436    }
1437
1438    pub fn search_nodes_filtered(
1439        &self,
1440        query: &str,
1441        filter_type: Option<&str>,
1442        offset: usize,
1443        limit: usize,
1444    ) -> Vec<Entity> {
1445        if query.is_empty() {
1446            return Vec::new();
1447        }
1448        let conn = self.readers.get();
1449
1450        // Single pass: collect IDs from name_fts then obs_fts, deduping with a set.
1451        let mut entity_ids: Vec<i64> = Vec::new();
1452        let mut seen: HashSet<i64> = HashSet::new();
1453
1454        if let Ok(mut stmt) = conn.prepare(
1455            "SELECT rowid FROM name_fts WHERE name_fts MATCH ?1 ORDER BY rank LIMIT ?2",
1456        ) {
1457            let limit_i64 = (limit + offset) as i64;
1458            if let Ok(rows) = stmt.query_map(params![query, limit_i64], |row| {
1459                row.get::<_, i64>(0)
1460            }) {
1461                for row in rows.flatten() {
1462                    if seen.insert(row) {
1463                        entity_ids.push(row);
1464                    }
1465                }
1466            }
1467        }
1468
1469        if let Ok(mut stmt) = conn.prepare(
1470            "SELECT entity_id FROM obs_fts JOIN observation ON obs_fts.rowid = observation.id
1471             WHERE obs_fts MATCH ?1
1472             GROUP BY entity_id
1473             LIMIT ?2",
1474        ) {
1475            let limit_i64 = (limit + offset) as i64;
1476            if let Ok(rows) = stmt.query_map(params![query, limit_i64], |row| {
1477                row.get::<_, i64>(0)
1478            }) {
1479                for row in rows.flatten() {
1480                    if seen.insert(row) {
1481                        entity_ids.push(row);
1482                    }
1483                }
1484            }
1485        }
1486
1487        // Apply filter_type, offset, limit.
1488        let mut results = Vec::new();
1489        let mut count: usize = 0;
1490        for eid in entity_ids {
1491            if let Ok(entity) = entity_by_id(&conn, eid) {
1492                if let Some(ft) = filter_type
1493                    && !ft.is_empty() && entity.entity_type != ft {
1494                        continue;
1495                    }
1496                if count < offset {
1497                    count += 1;
1498                    continue;
1499                }
1500                if results.len() >= limit {
1501                    break;
1502                }
1503                results.push(entity);
1504                count += 1;
1505            }
1506        }
1507
1508        results
1509    }
1510
1511    pub fn read_graph_filtered(
1512        &self,
1513        filter_type: Option<&str>,
1514        offset: usize,
1515        limit: usize,
1516    ) -> Result<String> {
1517        let conn = self.readers.get();
1518
1519        let limit_sql: i64 = if limit == usize::MAX {
1520            -1
1521        } else {
1522            limit.min(i64::MAX as usize) as i64
1523        };
1524        let offset_sql: i64 = offset as i64;
1525
1526        // Resolve the requested page of entity ids first. Relations are then
1527        // scoped to edges whose *both* endpoints fall inside this page, which
1528        // keeps the response self-consistent (no dangling references to
1529        // entities that were paged out) and bounds the relation payload by the
1530        // page size instead of dumping every relation in the graph.
1531        let filter = filter_type.filter(|ft| !ft.is_empty());
1532        let ids: Vec<i64> = if let Some(ft) = filter {
1533            let mut stmt = conn
1534                .prepare_cached(
1535                    "SELECT e.id FROM entity e
1536                     WHERE e.type_id = (SELECT id FROM type_dict WHERE kind = 0 AND name = ?1)
1537                       AND e.flags = 0
1538                     ORDER BY e.id LIMIT ?2 OFFSET ?3",
1539                )
1540                .map_err(sqlite_err)?;
1541            stmt.query_map(params![ft, limit_sql, offset_sql], |r| r.get::<_, i64>(0))
1542                .map_err(sqlite_err)?
1543                .filter_map(|r| r.ok())
1544                .collect()
1545        } else {
1546            let mut stmt = conn
1547                .prepare_cached(
1548                    "SELECT e.id FROM entity e WHERE e.flags = 0
1549                     ORDER BY e.id LIMIT ?1 OFFSET ?2",
1550                )
1551                .map_err(sqlite_err)?;
1552            stmt.query_map(params![limit_sql, offset_sql], |r| r.get::<_, i64>(0))
1553                .map_err(sqlite_err)?
1554                .filter_map(|r| r.ok())
1555                .collect()
1556        };
1557
1558        if ids.is_empty() {
1559            return Ok(r#"{"entities":[],"relations":[]}"#.to_string());
1560        }
1561
1562        let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1563
1564        let entities_json: String = {
1565            let sql = format!(
1566                "SELECT COALESCE(json_group_array(json_object(
1567                    'name', e.name,
1568                    'entityType', t.name,
1569                    'observations', COALESCE((
1570                        SELECT json_group_array(o.body ORDER BY o.idx)
1571                        FROM observation o WHERE o.entity_id = e.id
1572                    ), json('[]'))
1573                ) ORDER BY e.id), json('[]'))
1574                FROM entity e
1575                JOIN type_dict t ON t.id = e.type_id
1576                WHERE e.id IN ({placeholders}) AND e.flags = 0"
1577            );
1578            conn.query_row(&sql, rusqlite::params_from_iter(&ids), |row| {
1579                row.get::<_, String>(0)
1580            })
1581            .map_err(sqlite_err)?
1582        };
1583
1584        let relations_json: String = {
1585            let sql = format!(
1586                "SELECT COALESCE(json_group_array(json_object(
1587                    'from', e1.name,
1588                    'to', e2.name,
1589                    'relationType', t.name
1590                )), json('[]'))
1591                FROM relation r
1592                JOIN entity e1 ON e1.id = r.from_id
1593                JOIN entity e2 ON e2.id = r.to_id
1594                JOIN type_dict t ON t.id = r.type_id
1595                WHERE r.from_id IN ({placeholders}) AND r.to_id IN ({placeholders})
1596                  AND e1.flags = 0 AND e2.flags = 0"
1597            );
1598            let all_params: Vec<&dyn ToSql> = ids
1599                .iter()
1600                .map(|id| id as &dyn ToSql)
1601                .chain(ids.iter().map(|id| id as &dyn ToSql))
1602                .collect();
1603            conn.query_row(&sql, all_params.as_slice(), |row| row.get::<_, String>(0))
1604                .map_err(sqlite_err)?
1605        };
1606
1607        let mut out = String::with_capacity(32 + entities_json.len() + relations_json.len());
1608        out.push_str("{\"entities\":");
1609        out.push_str(&entities_json);
1610        out.push_str(",\"relations\":");
1611        out.push_str(&relations_json);
1612        out.push('}');
1613        Ok(out)
1614    }
1615
1616    pub fn open_nodes(&self, names: &[String]) -> String {
1617        let conn = self.readers.get();
1618        let mut entity_ids: Vec<i64> = Vec::new();
1619
1620        for name in names {
1621            let h = name_hash(name);
1622            if let Ok(Some(id)) = conn
1623                .query_row(
1624                    "SELECT id FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
1625                    params![h, name],
1626                    |row| row.get::<_, i64>(0),
1627                )
1628                .map(Some)
1629                .or_else(|e| if is_not_found(&e) { Ok(None) } else { Err(sqlite_err(e)) })
1630            {
1631                entity_ids.push(id);
1632            }
1633        }
1634
1635        if entity_ids.is_empty() {
1636            return r#"{"entities":[],"relations":[]}"#.to_string();
1637        }
1638
1639        let placeholders: Vec<String> = entity_ids.iter().map(|_| "?".to_string()).collect();
1640        let ids_str = placeholders.join(",");
1641
1642        let entities_json: String = {
1643            let sql = format!(
1644                "SELECT COALESCE(json_group_array(json_object(
1645                    'name', e.name,
1646                    'entityType', t.name,
1647                    'observations', COALESCE((
1648                        SELECT json_group_array(o.body ORDER BY o.idx)
1649                        FROM observation o WHERE o.entity_id = e.id
1650                    ), json('[]'))
1651                ) ORDER BY e.id), json('[]'))
1652                FROM entity e
1653                JOIN type_dict t ON t.id = e.type_id
1654                WHERE e.id IN ({ids_str}) AND e.flags = 0"
1655            );
1656            conn.query_row(&sql, rusqlite::params_from_iter(&entity_ids), |row| {
1657                row.get::<_, String>(0)
1658            })
1659            .unwrap_or_else(|_| "[]".to_string())
1660        };
1661
1662        let relations_json: String = {
1663            let sql = format!(
1664                "SELECT COALESCE(json_group_array(json_object(
1665                    'from', e1.name,
1666                    'to', e2.name,
1667                    'relationType', t.name
1668                )), json('[]'))
1669                FROM relation r
1670                JOIN entity e1 ON e1.id = r.from_id
1671                JOIN entity e2 ON e2.id = r.to_id
1672                JOIN type_dict t ON t.id = r.type_id
1673                WHERE (r.from_id IN ({ids_str}) OR r.to_id IN ({ids_str}))
1674                  AND e1.flags = 0 AND e2.flags = 0"
1675            );
1676            let all_params: Vec<&dyn rusqlite::types::ToSql> = entity_ids
1677                .iter()
1678                .map(|id| id as &dyn rusqlite::types::ToSql)
1679                .chain(entity_ids.iter().map(|id| id as &dyn rusqlite::types::ToSql))
1680                .collect();
1681            let mut stmt = conn.prepare(&sql).unwrap();
1682            stmt.query_row(all_params.as_slice(), |row| row.get::<_, String>(0))
1683                .unwrap_or_else(|_| "[]".to_string())
1684        };
1685
1686        let mut out = String::with_capacity(32 + entities_json.len() + relations_json.len());
1687        out.push_str("{\"entities\":");
1688        out.push_str(&entities_json);
1689        out.push_str(",\"relations\":");
1690        out.push_str(&relations_json);
1691        out.push('}');
1692        out
1693    }
1694
1695    pub fn entities_exist(&self, names: &[String]) -> Result<Vec<bool>> {
1696        let conn = self.readers.get();
1697        let mut results = Vec::with_capacity(names.len());
1698        for name in names {
1699            let h = name_hash(name);
1700            let exists: bool = conn
1701                .query_row(
1702                    "SELECT 1 FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
1703                    params![h, name],
1704                    |_| Ok(()),
1705                )
1706                .is_ok();
1707            results.push(exists);
1708        }
1709        Ok(results)
1710    }
1711
1712    pub fn degree(&self, name: &str, direction: Direction) -> Result<usize> {
1713        let conn = self.readers.get();
1714        let (_, _, out_d, in_d) = match self.get_entity_id(&conn, name)? {
1715            Some(v) => v,
1716            None => {
1717                return Err(MCSError::InvalidParams(format!(
1718                    "Entity '{name}' not found"
1719                )))
1720            }
1721        };
1722        Ok(match direction {
1723            Direction::Outgoing => out_d as usize,
1724            Direction::Incoming => in_d as usize,
1725            Direction::Both => (out_d + in_d) as usize,
1726        })
1727    }
1728
1729    pub fn get_entity_count(&self) -> Result<usize> {
1730        let conn = self.readers.get();
1731        read_graph_stat(&conn, "entities")
1732            .map(|v| v as usize)
1733            .map_err(|_| MCSError::MemoryError("Failed to read entity count".into()))
1734    }
1735
1736    pub fn get_relation_count(&self) -> Result<usize> {
1737        let conn = self.readers.get();
1738        read_graph_stat(&conn, "relations")
1739            .map(|v| v as usize)
1740            .map_err(|_| MCSError::MemoryError("Failed to read relation count".into()))
1741    }
1742
1743    pub fn search_relations(
1744        &self,
1745        from: Option<&str>,
1746        to: Option<&str>,
1747        rtype: Option<&str>,
1748    ) -> Vec<Relation> {
1749        let conn = self.readers.get();
1750        let mut results = Vec::new();
1751
1752        // A filter that is supplied but resolves to nothing uses the sentinel
1753        // id -1 (which matches no row), so the query returns empty rather than
1754        // silently dropping the filter and matching every relation. The lookups
1755        // are read-only — `get_type_id` would *insert* a phantom type, which is
1756        // both wrong and impossible on a `query_only` reader connection.
1757        let from_id = from
1758            .filter(|f| !f.is_empty())
1759            .map(|f| entity_name_lookup(&conn, f).ok().flatten().unwrap_or(-1));
1760        let to_id = to
1761            .filter(|t| !t.is_empty())
1762            .map(|t| entity_name_lookup(&conn, t).ok().flatten().unwrap_or(-1));
1763        let type_id = rtype
1764            .filter(|rt| !rt.is_empty())
1765            .map(|rt| lookup_type_id(&conn, rt, 1).unwrap_or(-1));
1766
1767        match (from_id, to_id, type_id) {
1768            (Some(fid), Some(tid), Some(tpid)) => {
1769                if let Ok(mut stmt) = conn.prepare_cached(
1770                    "SELECT e1.name, e2.name, t.name
1771                     FROM relation r
1772                     JOIN entity e1 ON e1.id = r.from_id
1773                     JOIN entity e2 ON e2.id = r.to_id
1774                     JOIN type_dict t ON t.id = r.type_id
1775                     WHERE r.from_id = ?1 AND r.to_id = ?2 AND r.type_id = ?3
1776                       AND e1.flags = 0 AND e2.flags = 0
1777                     ORDER BY r.from_id, r.to_id"
1778                )
1779                    && let Ok(rows) = stmt.query_map(params![fid, tid, tpid], |row| {
1780                        Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1781                    }) {
1782                        for row in rows.flatten() { results.push(row); }
1783                    }
1784            }
1785            (Some(fid), Some(tid), None) => {
1786                if let Ok(mut stmt) = conn.prepare_cached(
1787                    "SELECT e1.name, e2.name, t.name
1788                     FROM relation r
1789                     JOIN entity e1 ON e1.id = r.from_id
1790                     JOIN entity e2 ON e2.id = r.to_id
1791                     JOIN type_dict t ON t.id = r.type_id
1792                     WHERE r.from_id = ?1 AND r.to_id = ?2
1793                       AND e1.flags = 0 AND e2.flags = 0
1794                     ORDER BY r.from_id, r.to_id"
1795                )
1796                    && let Ok(rows) = stmt.query_map(params![fid, tid], |row| {
1797                        Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1798                    }) {
1799                        for row in rows.flatten() { results.push(row); }
1800                    }
1801            }
1802            (Some(fid), None, Some(tpid)) => {
1803                if let Ok(mut stmt) = conn.prepare_cached(
1804                    "SELECT e1.name, e2.name, t.name
1805                     FROM relation r
1806                     JOIN entity e1 ON e1.id = r.from_id
1807                     JOIN entity e2 ON e2.id = r.to_id
1808                     JOIN type_dict t ON t.id = r.type_id
1809                     WHERE r.from_id = ?1 AND r.type_id = ?2
1810                       AND e1.flags = 0 AND e2.flags = 0
1811                     ORDER BY r.from_id, r.to_id"
1812                )
1813                    && let Ok(rows) = stmt.query_map(params![fid, tpid], |row| {
1814                        Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1815                    }) {
1816                        for row in rows.flatten() { results.push(row); }
1817                    }
1818            }
1819            (None, Some(tid), Some(tpid)) => {
1820                if let Ok(mut stmt) = conn.prepare_cached(
1821                    "SELECT e1.name, e2.name, t.name
1822                     FROM relation r
1823                     JOIN entity e1 ON e1.id = r.from_id
1824                     JOIN entity e2 ON e2.id = r.to_id
1825                     JOIN type_dict t ON t.id = r.type_id
1826                     WHERE r.to_id = ?1 AND r.type_id = ?2
1827                       AND e1.flags = 0 AND e2.flags = 0
1828                     ORDER BY r.from_id, r.to_id"
1829                )
1830                    && let Ok(rows) = stmt.query_map(params![tid, tpid], |row| {
1831                        Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1832                    }) {
1833                        for row in rows.flatten() { results.push(row); }
1834                    }
1835            }
1836            (Some(fid), None, None) => {
1837                if let Ok(mut stmt) = conn.prepare_cached(
1838                    "SELECT e1.name, e2.name, t.name
1839                     FROM relation r
1840                     JOIN entity e1 ON e1.id = r.from_id
1841                     JOIN entity e2 ON e2.id = r.to_id
1842                     JOIN type_dict t ON t.id = r.type_id
1843                     WHERE r.from_id = ?1
1844                       AND e1.flags = 0 AND e2.flags = 0
1845                     ORDER BY r.from_id, r.to_id"
1846                )
1847                    && let Ok(rows) = stmt.query_map(params![fid], |row| {
1848                        Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1849                    }) {
1850                        for row in rows.flatten() { results.push(row); }
1851                    }
1852            }
1853            (None, Some(tid), None) => {
1854                if let Ok(mut stmt) = conn.prepare_cached(
1855                    "SELECT e1.name, e2.name, t.name
1856                     FROM relation r
1857                     JOIN entity e1 ON e1.id = r.from_id
1858                     JOIN entity e2 ON e2.id = r.to_id
1859                     JOIN type_dict t ON t.id = r.type_id
1860                     WHERE r.to_id = ?1
1861                       AND e1.flags = 0 AND e2.flags = 0
1862                     ORDER BY r.from_id, r.to_id"
1863                )
1864                    && let Ok(rows) = stmt.query_map(params![tid], |row| {
1865                        Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1866                    }) {
1867                        for row in rows.flatten() { results.push(row); }
1868                    }
1869            }
1870            (None, None, Some(tpid)) => {
1871                if let Ok(mut stmt) = conn.prepare_cached(
1872                    "SELECT e1.name, e2.name, t.name
1873                     FROM relation r
1874                     JOIN entity e1 ON e1.id = r.from_id
1875                     JOIN entity e2 ON e2.id = r.to_id
1876                     JOIN type_dict t ON t.id = r.type_id
1877                     WHERE r.type_id = ?1
1878                       AND e1.flags = 0 AND e2.flags = 0
1879                     ORDER BY r.from_id, r.to_id"
1880                )
1881                    && let Ok(rows) = stmt.query_map(params![tpid], |row| {
1882                        Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1883                    }) {
1884                        for row in rows.flatten() { results.push(row); }
1885                    }
1886            }
1887            (None, None, None) => {
1888                if let Ok(mut stmt) = conn.prepare_cached(
1889                    "SELECT e1.name, e2.name, t.name
1890                     FROM relation r
1891                     JOIN entity e1 ON e1.id = r.from_id
1892                     JOIN entity e2 ON e2.id = r.to_id
1893                     JOIN type_dict t ON t.id = r.type_id
1894                     WHERE e1.flags = 0 AND e2.flags = 0
1895                     ORDER BY r.from_id, r.to_id"
1896                )
1897                    && let Ok(rows) = stmt.query_map([], |row| {
1898                        Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1899                    }) {
1900                        for row in rows.flatten() { results.push(row); }
1901                    }
1902            }
1903        }
1904        results
1905    }
1906
1907    pub fn find_path(&self, from: &str, to: &str) -> Result<Option<Vec<String>>> {
1908        let conn = self.readers.get();
1909        let (from_id, _, _, _) = match self.get_entity_id(&conn, from)? {
1910            Some(v) => v,
1911            None => {
1912                return Err(MCSError::InvalidParams(format!(
1913                    "Source entity '{from}' not found"
1914                )))
1915            }
1916        };
1917        let (to_id, _, _, _) = match self.get_entity_id(&conn, to)? {
1918            Some(v) => v,
1919            None => {
1920                return Err(MCSError::InvalidParams(format!(
1921                    "Target entity '{to}' not found"
1922                )))
1923            }
1924        };
1925
1926        if from_id == to_id {
1927            return Ok(Some(vec![from.to_string()]));
1928        }
1929
1930        // BFS with adjacency from relation table.
1931        let mut visited = HashSet::new();
1932        let mut parent: FxHashMap<i64, i64> = FxHashMap::default();
1933        let mut queue = VecDeque::new();
1934        visited.insert(from_id);
1935        queue.push_back(from_id);
1936
1937        while let Some(cur) = queue.pop_front() {
1938            if cur == to_id {
1939                break;
1940            }
1941            // Fetch out-neighbors.
1942            if let Ok(mut stmt) =
1943                conn.prepare_cached("SELECT to_id FROM relation WHERE from_id = ?1")
1944                && let Ok(rows) = stmt.query_map(params![cur], |row| row.get::<_, i64>(0)) {
1945                    for row in rows.flatten() {
1946                        if visited.insert(row) {
1947                            parent.insert(row, cur);
1948                            queue.push_back(row);
1949                        }
1950                    }
1951                }
1952            // Also check in-neighbors (undirected traversal).
1953            if let Ok(mut stmt) =
1954                conn.prepare_cached("SELECT from_id FROM relation WHERE to_id = ?1")
1955                && let Ok(rows) = stmt.query_map(params![cur], |row| row.get::<_, i64>(0)) {
1956                    for row in rows.flatten() {
1957                        if visited.insert(row) {
1958                            parent.insert(row, cur);
1959                            queue.push_back(row);
1960                        }
1961                    }
1962                }
1963        }
1964
1965        if !parent.contains_key(&to_id) && to_id != from_id {
1966            return Ok(None);
1967        }
1968
1969        let mut path = Vec::new();
1970        let mut cur = to_id;
1971        path.push(cur);
1972        while let Some(&p) = parent.get(&cur) {
1973            path.push(p);
1974            cur = p;
1975            if cur == from_id {
1976                break;
1977            }
1978        }
1979        path.reverse();
1980
1981        let placeholders: Vec<String> = path.iter().map(|_| "?".to_string()).collect();
1982        let sql = format!(
1983            "SELECT id, name FROM entity WHERE id IN ({})",
1984            placeholders.join(",")
1985        );
1986        let name_map: FxHashMap<i64, String> = if let Ok(mut stmt) = conn.prepare(&sql)
1987            && let Ok(rows) = stmt.query_map(
1988                rusqlite::params_from_iter(&path),
1989                |row| Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?)),
1990            ) {
1991            rows.flatten().collect()
1992        } else {
1993            FxHashMap::default()
1994        };
1995
1996        let name_path: Vec<String> = path.iter().filter_map(|id| name_map.get(id).cloned()).collect();
1997
1998        Ok(Some(name_path))
1999    }
2000
2001    pub fn compact(&self) -> Result<()> {
2002        let conn = self.writer.lock();
2003        conn.execute_batch("PRAGMA incremental_vacuum;").map_err(sqlite_err)?;
2004        Ok(())
2005    }
2006
2007    pub fn neighbors(
2008        &self,
2009        name: &str,
2010        direction: Direction,
2011        rtype: Option<&str>,
2012        depth: u32,
2013    ) -> Result<String> {
2014        self._traverse(name, direction, rtype, depth, true)
2015    }
2016
2017    pub fn extract_subgraph(
2018        &self,
2019        names: &[String],
2020        depth: u32,
2021    ) -> Result<String> {
2022        if names.is_empty() {
2023            return Ok(r#"{"entities":[],"relations":[]}"#.to_string());
2024        }
2025
2026        let conn = self.readers.get();
2027        let mut all_entity_ids: HashSet<i64> = HashSet::new();
2028        let mut frontier: HashSet<i64> = HashSet::new();
2029        let mut all_rel_pairs: HashSet<(i64, i64, i64)> = HashSet::new();
2030
2031        // Resolve seed entities.
2032        for name in names {
2033            let h = name_hash(name);
2034            if let Ok(Some(id)) = conn
2035                .query_row(
2036                    "SELECT id FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
2037                    params![h, name],
2038                    |row| row.get::<_, i64>(0),
2039                )
2040                .map(Some)
2041                .or_else(|e| if is_not_found(&e) { Ok(None) } else { Err(sqlite_err(e)) })
2042            {
2043                all_entity_ids.insert(id);
2044                frontier.insert(id);
2045            }
2046        }
2047
2048        let mut current_depth = 0u32;
2049        while current_depth < depth && !frontier.is_empty() {
2050            let mut next_frontier: HashSet<i64> = HashSet::new();
2051
2052            // Collect relations for current frontier — batched IN queries
2053            // instead of one query per frontier entity.
2054            const CHUNK: usize = 500;
2055            let frontier_ids: Vec<i64> = frontier.iter().copied().collect();
2056            for chunk in frontier_ids.chunks(CHUNK) {
2057                let placeholders: Vec<String> = chunk.iter().map(|_| "?".to_string()).collect();
2058                let in_clause = placeholders.join(",");
2059
2060                // Forward: from_id IN chunk.
2061                if let Ok(mut stmt) = conn.prepare(
2062                    &format!(
2063                        "SELECT from_id, to_id, type_id FROM relation WHERE from_id IN ({in_clause})",
2064                    )
2065                )
2066                    && let Ok(rows) = stmt.query_map(
2067                        rusqlite::params_from_iter(chunk),
2068                        |row| Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?, row.get::<_, i64>(2)?)),
2069                    )
2070                {
2071                    for row in rows.flatten() {
2072                        let (from_id, to_id, type_id) = row;
2073                        all_rel_pairs.insert((from_id, to_id, type_id));
2074                        if all_entity_ids.insert(to_id) {
2075                            next_frontier.insert(to_id);
2076                        }
2077                    }
2078                }
2079
2080                // Backward: to_id IN chunk.
2081                if let Ok(mut stmt) = conn.prepare(
2082                    &format!(
2083                        "SELECT from_id, to_id, type_id FROM relation WHERE to_id IN ({in_clause})",
2084                    )
2085                )
2086                    && let Ok(rows) = stmt.query_map(
2087                        rusqlite::params_from_iter(chunk),
2088                        |row| Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?, row.get::<_, i64>(2)?)),
2089                    )
2090                {
2091                    for row in rows.flatten() {
2092                        let (from_id, to_id, type_id) = row;
2093                        all_rel_pairs.insert((from_id, to_id, type_id));
2094                        if all_entity_ids.insert(from_id) {
2095                            next_frontier.insert(from_id);
2096                        }
2097                    }
2098                }
2099            }
2100            frontier = next_frontier;
2101            current_depth += 1;
2102        }
2103
2104        let entities_json: String = {
2105            if all_entity_ids.is_empty() {
2106                "[]".to_string()
2107            } else {
2108                let ids: Vec<i64> = all_entity_ids.iter().copied().collect();
2109                let placeholders: Vec<String> = ids.iter().map(|_| "?".to_string()).collect();
2110                let sql = format!(
2111                    "SELECT COALESCE(json_group_array(json_object(
2112                        'name', e.name,
2113                        'entityType', t.name,
2114                        'observations', COALESCE((
2115                            SELECT json_group_array(o.body ORDER BY o.idx)
2116                            FROM observation o WHERE o.entity_id = e.id
2117                        ), json('[]'))
2118                    ) ORDER BY e.id), json('[]'))
2119                    FROM entity e
2120                    JOIN type_dict t ON t.id = e.type_id
2121                    WHERE e.id IN ({}) AND e.flags = 0",
2122                    placeholders.join(",")
2123                );
2124                conn.query_row(&sql, rusqlite::params_from_iter(&ids), |row| {
2125                    row.get::<_, String>(0)
2126                })
2127                .unwrap_or_else(|_| "[]".to_string())
2128            }
2129        };
2130
2131        let relations_json: String = {
2132            if all_rel_pairs.is_empty() {
2133                "[]".to_string()
2134            } else {
2135                let vals: Vec<String> = all_rel_pairs.iter().map(|_| "(?, ?, ?)".to_string()).collect();
2136                let sql = format!(
2137                    "WITH r(from_id, to_id, type_id) AS (VALUES {})
2138                    SELECT COALESCE(json_group_array(json_object(
2139                        'from', e1.name,
2140                        'to', e2.name,
2141                        'relationType', t.name
2142                    )), json('[]'))
2143                    FROM r
2144                    JOIN entity e1 ON e1.id = r.from_id
2145                    JOIN entity e2 ON e2.id = r.to_id
2146                    JOIN type_dict t ON t.id = r.type_id
2147                    WHERE e1.flags = 0 AND e2.flags = 0",
2148                    vals.join(", ")
2149                );
2150                let params: Vec<&dyn ToSql> = all_rel_pairs.iter()
2151                    .flat_map(|(f, t, tp)| {
2152                        vec![f as &dyn ToSql, t as &dyn ToSql, tp as &dyn ToSql]
2153                    })
2154                    .collect();
2155                let mut stmt = conn.prepare(&sql).map_err(sqlite_err)?;
2156                stmt.query_row(params.as_slice(), |row| row.get::<_, String>(0))
2157                    .unwrap_or_else(|_| "[]".to_string())
2158            }
2159        };
2160
2161        let mut out = String::with_capacity(32 + entities_json.len() + relations_json.len());
2162        out.push_str("{\"entities\":");
2163        out.push_str(&entities_json);
2164        out.push_str(",\"relations\":");
2165        out.push_str(&relations_json);
2166        out.push('}');
2167        Ok(out)
2168    }
2169
2170    pub fn describe_entity(&self, name: &str) -> Result<Entity> {
2171        self.get_entity(name)?
2172            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))
2173    }
2174
2175    pub fn entity_type_counts(&self) -> Vec<(String, usize)> {
2176        let conn = self.readers.get();
2177        select_all_types(&conn, 0).unwrap_or_default()
2178    }
2179
2180    pub fn relation_type_counts(&self) -> Vec<(String, usize)> {
2181        let conn = self.readers.get();
2182        select_all_types(&conn, 1).unwrap_or_default()
2183    }
2184
2185    pub fn batch_get_entities(&self, names: &[String]) -> Vec<Option<Entity>> {
2186        names
2187            .iter()
2188            .map(|n| self.get_entity(n).unwrap_or(None))
2189            .collect()
2190    }
2191
2192    pub fn find_all_paths(
2193        &self,
2194        from: &str,
2195        to: &str,
2196        max_depth: usize,
2197        max_paths: usize,
2198    ) -> Result<Vec<Vec<String>>> {
2199        let conn = self.readers.get();
2200        let (from_id, _, _, _) = match self.get_entity_id(&conn, from)? {
2201            Some(v) => v,
2202            None => {
2203                return Err(MCSError::InvalidParams(format!(
2204                    "Source entity '{from}' not found"
2205                )))
2206            }
2207        };
2208        let (to_id, _, _, _) = match self.get_entity_id(&conn, to)? {
2209            Some(v) => v,
2210            None => {
2211                return Err(MCSError::InvalidParams(format!(
2212                    "Target entity '{to}' not found"
2213                )))
2214            }
2215        };
2216
2217        if from_id == to_id {
2218            return Ok(vec![vec![from.to_string()]]);
2219        }
2220
2221        // BFS enumerating all paths up to max_depth.
2222        let mut all_paths: Vec<Vec<i64>> = Vec::new();
2223        let mut queue: VecDeque<(i64, Vec<i64>)> = VecDeque::new();
2224        queue.push_back((from_id, vec![from_id]));
2225
2226        const MAX_QUEUE_SIZE: usize = 10_000_000;
2227
2228        while let Some((cur, path)) = queue.pop_front() {
2229            if all_paths.len() >= max_paths {
2230                break;
2231            }
2232            if path.len() > max_depth {
2233                continue;
2234            }
2235
2236            // Out-neighbors.
2237            if let Ok(mut stmt) =
2238                conn.prepare_cached("SELECT to_id FROM relation WHERE from_id = ?1")
2239                && let Ok(rows) = stmt.query_map(params![cur], |row| row.get::<_, i64>(0)) {
2240                    for next_id in rows.flatten() {
2241                        if next_id == to_id {
2242                            let mut full_path = path.clone();
2243                            full_path.push(next_id);
2244                            all_paths.push(full_path);
2245                            if all_paths.len() >= max_paths {
2246                                break;
2247                            }
2248                        } else if !path.contains(&next_id) && path.len() < max_depth {
2249                            if queue.len() >= MAX_QUEUE_SIZE {
2250                                return Err(MCSError::InvalidParams(
2251                                    "Path exploration queue exceeded limit (too many paths on highly connected graph)".to_string()
2252                                ));
2253                            }
2254                            let mut new_path = path.clone();
2255                            new_path.push(next_id);
2256                            queue.push_back((next_id, new_path));
2257                        }
2258                    }
2259                }
2260
2261            // In-neighbors (undirected).
2262            if let Ok(mut stmt) =
2263                conn.prepare_cached("SELECT from_id FROM relation WHERE to_id = ?1")
2264                && let Ok(rows) = stmt.query_map(params![cur], |row| row.get::<_, i64>(0)) {
2265                    for next_id in rows.flatten() {
2266                        if next_id == to_id {
2267                            let mut full_path = path.clone();
2268                            full_path.push(next_id);
2269                            all_paths.push(full_path);
2270                            if all_paths.len() >= max_paths {
2271                                break;
2272                            }
2273                        } else if !path.contains(&next_id) && path.len() < max_depth {
2274                            if queue.len() >= MAX_QUEUE_SIZE {
2275                                return Err(MCSError::InvalidParams(
2276                                    "Path exploration queue exceeded limit (too many paths on highly connected graph)".to_string()
2277                                ));
2278                            }
2279                            let mut new_path = path.clone();
2280                            new_path.push(next_id);
2281                            queue.push_back((next_id, new_path));
2282                        }
2283                    }
2284                }
2285        }
2286
2287        // Convert ids to names — one batch query instead of N lookups per path.
2288        let all_ids: HashSet<i64> = all_paths.iter().flat_map(|p| p.iter()).copied().collect();
2289        let id_list: Vec<i64> = all_ids.into_iter().collect();
2290        let name_map: FxHashMap<i64, String> = if id_list.is_empty() {
2291            FxHashMap::default()
2292        } else {
2293            let placeholders: Vec<String> = id_list.iter().map(|_| "?".to_string()).collect();
2294            let sql = format!(
2295                "SELECT id, name FROM entity WHERE id IN ({})",
2296                placeholders.join(",")
2297            );
2298            if let Ok(mut stmt) = conn.prepare(&sql)
2299                && let Ok(rows) = stmt.query_map(
2300                    rusqlite::params_from_iter(&id_list),
2301                    |row| Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?)),
2302                ) {
2303                rows.flatten().collect()
2304            } else {
2305                FxHashMap::default()
2306            }
2307        };
2308
2309        let mut named_paths: Vec<Vec<String>> = Vec::with_capacity(all_paths.len());
2310        for path_ids in all_paths {
2311            let named: Vec<String> = path_ids.iter().filter_map(|id| name_map.get(id).cloned()).collect();
2312            named_paths.push(named);
2313        }
2314
2315        Ok(named_paths)
2316    }
2317
2318    /// Export the whole graph as a JSON string. `max_rows` caps both the entity
2319    /// and relation arrays so a pathologically large graph cannot be coerced
2320    /// into an unbounded in-memory string (DoS guard); callers pass a generous
2321    /// constant. A negative value means "no limit".
2322    pub fn export(&self, _format: &str, max_rows: i64) -> Result<String> {
2323        let conn = self.readers.get();
2324        // Only JSON is supported; the format argument is accepted for forward
2325        // compatibility.
2326        conn.query_row(
2327            "SELECT json_object(
2328                'entities', COALESCE((
2329                    SELECT json_group_array(json_object(
2330                        'name', e.name,
2331                        'entityType', t.name,
2332                        'observations', COALESCE((
2333                            SELECT json_group_array(o.body ORDER BY o.idx)
2334                            FROM observation o WHERE o.entity_id = e.id
2335                        ), json('[]'))
2336                    ) ORDER BY e.id)
2337                    FROM (
2338                        SELECT id, name, type_id FROM entity
2339                        WHERE flags = 0 ORDER BY id LIMIT ?1
2340                    ) e
2341                    JOIN type_dict t ON t.id = e.type_id
2342                ), json('[]')),
2343                'relations', COALESCE((
2344                    SELECT json_group_array(json_object(
2345                        'from', e1.name,
2346                        'to', e2.name,
2347                        'relationType', t.name
2348                    ))
2349                    FROM (
2350                        SELECT from_id, to_id, type_id FROM relation LIMIT ?1
2351                    ) r
2352                    JOIN entity e1 ON e1.id = r.from_id
2353                    JOIN entity e2 ON e2.id = r.to_id
2354                    JOIN type_dict t ON t.id = r.type_id
2355                    WHERE e1.flags = 0 AND e2.flags = 0
2356                ), json('[]'))
2357            )",
2358            params![max_rows],
2359            |row| row.get::<_, String>(0),
2360        )
2361        .map_err(sqlite_err)
2362    }
2363
2364    pub fn wipe(&self) -> Result<()> {
2365        let conn = self.writer.lock();
2366        // `name_fts`/`obs_fts` are external-content FTS5 tables; the supported
2367        // way to empty them is the special `'delete-all'` command, not a bare
2368        // `DELETE FROM` (which is invalid for external-content tables and can
2369        // leave the index inconsistent). Run it after clearing the content rows.
2370        conn.execute_batch(
2371            "DELETE FROM observation;
2372             DELETE FROM relation;
2373             DELETE FROM entity;
2374             DELETE FROM type_dict;
2375             INSERT INTO name_fts(name_fts) VALUES('delete-all');
2376             INSERT INTO obs_fts(obs_fts) VALUES('delete-all');
2377             UPDATE graph_stat SET value = 0 WHERE key IN ('entities', 'relations', 'observations');
2378             UPDATE graph_stat SET value = 0 WHERE key IN ('entity_seq', 'obs_seq');",
2379        )
2380        .map_err(sqlite_err)?;
2381        self.seq_entity.store(0, Ordering::Relaxed);
2382        self.seq_obs.store(0, Ordering::Relaxed);
2383        self.cache.lock().clear();
2384        Ok(())
2385    }
2386
2387    /// Periodic database maintenance: WAL checkpoint, query planner analysis,
2388    /// and FTS index optimization. Call from a background timer.
2389    pub fn run_maintenance(&self) -> Result<()> {
2390        let conn = self.writer.lock();
2391
2392        conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);")
2393            .map_err(sqlite_err)?;
2394
2395        conn.execute_batch("PRAGMA optimize(0x10000);")
2396            .map_err(sqlite_err)?;
2397
2398        conn.execute_batch(
2399            "INSERT INTO name_fts(name_fts) VALUES('optimize');
2400             INSERT INTO obs_fts(obs_fts) VALUES('optimize');",
2401        )
2402        .map_err(sqlite_err)?;
2403
2404        Ok(())
2405    }
2406
2407    /// Run a non-blocking `wal_checkpoint(PASSIVE)` to fsync committed WAL frames
2408    /// without stalling readers or writers. Call from a short-interval timer to
2409    /// bound the durability window in `async` mode.
2410    pub fn checkpoint_passive(&self) -> Result<()> {
2411        let conn = self.writer.lock();
2412        conn.execute_batch("PRAGMA wal_checkpoint(PASSIVE);")
2413            .map_err(sqlite_err)?;
2414        Ok(())
2415    }
2416
2417    fn _traverse(
2418        &self,
2419        name: &str,
2420        direction: Direction,
2421        rtype: Option<&str>,
2422        depth: u32,
2423        // unused — we always include relations; the caller controls via depth
2424        _include_relations: bool,
2425    ) -> Result<String> {
2426        let conn = self.readers.get();
2427        let (start_id, _, _, _) = match self.get_entity_id(&conn, name)? {
2428            Some(v) => v,
2429            None => {
2430                return Err(MCSError::InvalidParams(format!(
2431                    "Entity '{name}' not found"
2432                )))
2433            }
2434        };
2435
2436        let mut all_ids: HashSet<i64> = HashSet::new();
2437        let mut all_rels: HashSet<(i64, i64, i64)> = HashSet::new();
2438        let mut frontier: HashSet<i64> = HashSet::new();
2439        all_ids.insert(start_id);
2440        frontier.insert(start_id);
2441
2442        // Read-only type resolution. A requested-but-missing type uses the
2443        // sentinel id -1 (matches no edge), so traversal yields just the start
2444        // entity instead of falling back to "no type filter" and walking every
2445        // edge. `get_type_id` is avoided here: it inserts and cannot run on the
2446        // `query_only` reader connection.
2447        let type_filter: Option<i64> = rtype
2448            .filter(|rt| !rt.is_empty())
2449            .map(|rt| lookup_type_id(&conn, rt, 1).unwrap_or(-1));
2450
2451        // Pre-compile all four possible queries outside the loop.
2452        let mut q_out_t = conn.prepare_cached(
2453            "SELECT to_id, type_id FROM relation WHERE from_id = ?1 AND type_id = ?2");
2454        let mut q_out   = conn.prepare_cached(
2455            "SELECT to_id, type_id FROM relation WHERE from_id = ?1");
2456        let mut q_in_t  = conn.prepare_cached(
2457            "SELECT from_id, type_id FROM relation WHERE to_id = ?1 AND type_id = ?2");
2458        let mut q_in    = conn.prepare_cached(
2459            "SELECT from_id, type_id FROM relation WHERE to_id = ?1");
2460
2461        let mut cur_depth = 0u32;
2462        while cur_depth < depth && !frontier.is_empty() {
2463            let mut next_frontier: HashSet<i64> = HashSet::new();
2464
2465            for &fid in &frontier {
2466                if direction == Direction::Outgoing || direction == Direction::Both {
2467                    if let Some(tid) = type_filter {
2468                        if let Ok(ref mut stmt) = q_out_t
2469                            && let Ok(rows) = stmt.query_map(params![fid, tid], |row| {
2470                                Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?))
2471                            }) {
2472                                for row in rows.flatten() {
2473                                    let (to_id, t_id) = row;
2474                                    all_rels.insert((fid, to_id, t_id));
2475                                    if all_ids.insert(to_id) { next_frontier.insert(to_id); }
2476                                }
2477                            }
2478                    } else if let Ok(ref mut stmt) = q_out
2479                        && let Ok(rows) = stmt.query_map(params![fid], |row| {
2480                            Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?))
2481                        }) {
2482                            for row in rows.flatten() {
2483                                let (to_id, t_id) = row;
2484                                all_rels.insert((fid, to_id, t_id));
2485                                if all_ids.insert(to_id) { next_frontier.insert(to_id); }
2486                            }
2487                        }
2488                }
2489
2490                if direction == Direction::Incoming || direction == Direction::Both {
2491                    if let Some(tid) = type_filter {
2492                        if let Ok(ref mut stmt) = q_in_t
2493                            && let Ok(rows) = stmt.query_map(params![fid, tid], |row| {
2494                                Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?))
2495                            }) {
2496                                for row in rows.flatten() {
2497                                    let (from_id, t_id) = row;
2498                                    all_rels.insert((from_id, fid, t_id));
2499                                    if all_ids.insert(from_id) { next_frontier.insert(from_id); }
2500                                }
2501                            }
2502                    } else if let Ok(ref mut stmt) = q_in
2503                        && let Ok(rows) = stmt.query_map(params![fid], |row| {
2504                            Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?))
2505                        }) {
2506                            for row in rows.flatten() {
2507                                let (from_id, t_id) = row;
2508                                all_rels.insert((from_id, fid, t_id));
2509                                if all_ids.insert(from_id) { next_frontier.insert(from_id); }
2510                            }
2511                        }
2512                }
2513            }
2514
2515            frontier = next_frontier;
2516            cur_depth += 1;
2517        }
2518
2519        let entities_json: String = {
2520            if all_ids.is_empty() {
2521                "[]".to_string()
2522            } else {
2523                let ids: Vec<i64> = all_ids.iter().copied().collect();
2524                let placeholders: Vec<String> = ids.iter().map(|_| "?".to_string()).collect();
2525                let sql = format!(
2526                    "SELECT COALESCE(json_group_array(json_object(
2527                        'name', e.name,
2528                        'entityType', t.name,
2529                        'observations', COALESCE((
2530                            SELECT json_group_array(o.body ORDER BY o.idx)
2531                            FROM observation o WHERE o.entity_id = e.id
2532                        ), json('[]'))
2533                    ) ORDER BY e.id), json('[]'))
2534                    FROM entity e
2535                    JOIN type_dict t ON t.id = e.type_id
2536                    WHERE e.id IN ({}) AND e.flags = 0",
2537                    placeholders.join(",")
2538                );
2539                conn.query_row(&sql, rusqlite::params_from_iter(&ids), |row| {
2540                    row.get::<_, String>(0)
2541                })
2542                .unwrap_or_else(|_| "[]".to_string())
2543            }
2544        };
2545
2546        let relations_json: String = {
2547            if all_rels.is_empty() {
2548                "[]".to_string()
2549            } else {
2550                let vals: Vec<String> = all_rels.iter().map(|_| "(?, ?, ?)".to_string()).collect();
2551                let sql = format!(
2552                    "WITH r(from_id, to_id, type_id) AS (VALUES {})
2553                    SELECT COALESCE(json_group_array(json_object(
2554                        'from', e1.name,
2555                        'to', e2.name,
2556                        'relationType', t.name
2557                    )), json('[]'))
2558                    FROM r
2559                    JOIN entity e1 ON e1.id = r.from_id
2560                    JOIN entity e2 ON e2.id = r.to_id
2561                    JOIN type_dict t ON t.id = r.type_id
2562                    WHERE e1.flags = 0 AND e2.flags = 0",
2563                    vals.join(", ")
2564                );
2565                let params: Vec<&dyn ToSql> = all_rels.iter()
2566                    .flat_map(|(f, t, tp)| {
2567                        vec![f as &dyn ToSql, t as &dyn ToSql, tp as &dyn ToSql]
2568                    })
2569                    .collect();
2570                let mut stmt = conn.prepare(&sql).map_err(sqlite_err)?;
2571                stmt.query_row(params.as_slice(), |row| row.get::<_, String>(0))
2572                    .unwrap_or_else(|_| "[]".to_string())
2573            }
2574        };
2575
2576        let mut out = String::with_capacity(32 + entities_json.len() + relations_json.len());
2577        out.push_str("{\"entities\":");
2578        out.push_str(&entities_json);
2579        out.push_str(",\"relations\":");
2580        out.push_str(&relations_json);
2581        out.push('}');
2582        Ok(out)
2583    }
2584}
2585
2586// ── Tests ────────────────────────────────────────────────────────────────
2587
2588#[cfg(test)]
2589mod tests {
2590    use super::*;
2591    use serde_json::Value;
2592    use std::ops::Deref;
2593    use std::path::PathBuf;
2594
2595    struct TestKg(GraphHandle, PathBuf);
2596
2597    impl Deref for TestKg {
2598        type Target = GraphHandle;
2599        fn deref(&self) -> &GraphHandle {
2600            &self.0
2601        }
2602    }
2603
2604    impl Drop for TestKg {
2605        fn drop(&mut self) {
2606            cleanup_db(&self.1);
2607        }
2608    }
2609
2610    fn cleanup_db(path: &std::path::Path) {
2611        let _ = std::fs::remove_file(path);
2612        let _ = std::fs::remove_file(path.with_extension("db-wal"));
2613        let _ = std::fs::remove_file(path.with_extension("db-shm"));
2614    }
2615
2616    fn new_kg() -> TestKg {
2617        use std::sync::atomic::AtomicU64;
2618        use std::sync::atomic::Ordering;
2619        static COUNTER: AtomicU64 = AtomicU64::new(0);
2620        let n = COUNTER.fetch_add(1, Ordering::SeqCst);
2621        let dir = std::env::temp_dir();
2622        let path = dir.join(format!("kg_test_{}_{}.db", std::process::id(), n));
2623        cleanup_db(&path);
2624        let kg = GraphHandle::new(&path, Durability::Async, SqliteTuning::default(), NonZeroUsize::new(10000).unwrap(), 4).expect("create KG");
2625        TestKg(kg, path)
2626    }
2627
2628    #[test]
2629    fn test_create_and_get_entity() {
2630        let kg = new_kg();
2631        let entities = vec![Entity {
2632            name: "test".into(),
2633            entity_type: "person".into(),
2634            observations: vec!["obs1".into(), "obs2".into()],
2635        }];
2636        let created = kg.create_entities(&entities).unwrap();
2637        assert_eq!(created.len(), 1);
2638
2639        let got = kg.get_entity("test").unwrap().unwrap();
2640        assert_eq!(got.name, "test");
2641        assert_eq!(got.entity_type, "person");
2642        assert_eq!(got.observations, vec!["obs1", "obs2"]);
2643    }
2644
2645    #[test]
2646    fn test_get_nonexistent() {
2647        let kg = new_kg();
2648        assert!(kg.get_entity("nonexistent").unwrap().is_none());
2649    }
2650
2651    #[test]
2652    fn test_delete_entity() {
2653        let kg = new_kg();
2654        kg.create_entities(&[Entity {
2655            name: "del".into(),
2656            entity_type: "t".into(),
2657            observations: vec![],
2658        }])
2659        .unwrap();
2660        assert!(kg.get_entity("del").unwrap().is_some());
2661        kg.delete_entities(&["del".to_string()]).unwrap();
2662        assert!(kg.get_entity("del").unwrap().is_none());
2663    }
2664
2665    #[test]
2666    fn test_add_and_delete_observations() {
2667        let kg = new_kg();
2668        kg.create_entities(&[Entity {
2669            name: "obs_test".into(),
2670            entity_type: "t".into(),
2671            observations: vec!["a".into()],
2672        }])
2673        .unwrap();
2674
2675        let added = kg.add_observations("obs_test", &["b".into(), "c".into()]).unwrap();
2676        assert_eq!(added.len(), 2);
2677
2678        let ent = kg.get_entity("obs_test").unwrap().unwrap();
2679        assert!(ent.observations.contains(&"b".into()));
2680        assert!(ent.observations.contains(&"c".into()));
2681
2682        kg.delete_observations("obs_test", &["b".into()]).unwrap();
2683        let ent = kg.get_entity("obs_test").unwrap().unwrap();
2684        assert!(!ent.observations.contains(&"b".into()));
2685        assert!(ent.observations.contains(&"c".into()));
2686        assert!(ent.observations.contains(&"a".into()));
2687    }
2688
2689    #[test]
2690    fn test_create_relations() {
2691        let kg = new_kg();
2692        kg.create_entities(&[
2693            Entity {
2694                name: "A".into(),
2695                entity_type: "node".into(),
2696                observations: vec![],
2697            },
2698            Entity {
2699                name: "B".into(),
2700                entity_type: "node".into(),
2701                observations: vec![],
2702            },
2703        ])
2704        .unwrap();
2705
2706        let rels = kg
2707            .create_relations(&[Relation {
2708                from: "A".into(),
2709                to: "B".into(),
2710                relation_type: "edge".into(),
2711            }])
2712            .unwrap();
2713        assert_eq!(rels.len(), 1);
2714
2715        assert_eq!(kg.get_entity_count().unwrap(), 2);
2716        assert_eq!(kg.get_relation_count().unwrap(), 1);
2717    }
2718
2719    #[test]
2720    fn test_search_nodes() {
2721        let kg = new_kg();
2722        kg.create_entities(&[Entity {
2723            name: "Einstein".into(),
2724            entity_type: "scientist".into(),
2725            observations: vec!["physics".into(), "relativity".into()],
2726        }])
2727        .unwrap();
2728
2729        let results = kg.search_nodes_filtered("physics", None, 0, 10);
2730        assert_eq!(results.len(), 1);
2731        assert_eq!(results[0].name, "Einstein");
2732
2733        let results = kg.search_nodes_filtered("physics", Some("scientist"), 0, 10);
2734        assert_eq!(results.len(), 1);
2735
2736        let results = kg.search_nodes_filtered("physics", Some("nonexistent"), 0, 10);
2737        assert_eq!(results.len(), 0);
2738    }
2739
2740    #[test]
2741    fn test_find_path() {
2742        let kg = new_kg();
2743        kg.create_entities(&[
2744            Entity { name: "A".into(), entity_type: "n".into(), observations: vec![] },
2745            Entity { name: "B".into(), entity_type: "n".into(), observations: vec![] },
2746            Entity { name: "C".into(), entity_type: "n".into(), observations: vec![] },
2747        ]).unwrap();
2748
2749        kg.create_relations(&[
2750            Relation { from: "A".into(), to: "B".into(), relation_type: "e".into() },
2751            Relation { from: "B".into(), to: "C".into(), relation_type: "e".into() },
2752        ]).unwrap();
2753
2754        let path = kg.find_path("A", "C").unwrap().unwrap();
2755        assert_eq!(path, vec!["A", "B", "C"]);
2756    }
2757
2758    #[test]
2759    fn test_degree() {
2760        let kg = new_kg();
2761        kg.create_entities(&[
2762            Entity { name: "A".into(), entity_type: "n".into(), observations: vec![] },
2763            Entity { name: "B".into(), entity_type: "n".into(), observations: vec![] },
2764            Entity { name: "C".into(), entity_type: "n".into(), observations: vec![] },
2765        ]).unwrap();
2766
2767        kg.create_relations(&[
2768            Relation { from: "A".into(), to: "B".into(), relation_type: "e".into() },
2769            Relation { from: "A".into(), to: "C".into(), relation_type: "e".into() },
2770        ]).unwrap();
2771
2772        assert_eq!(kg.degree("A", Direction::Outgoing).unwrap(), 2);
2773        assert_eq!(kg.degree("A", Direction::Incoming).unwrap(), 0);
2774        assert_eq!(kg.degree("B", Direction::Incoming).unwrap(), 1);
2775    }
2776
2777    #[test]
2778    fn test_neighbors() {
2779        let kg = new_kg();
2780        kg.create_entities(&[
2781            Entity { name: "A".into(), entity_type: "n".into(), observations: vec![] },
2782            Entity { name: "B".into(), entity_type: "n".into(), observations: vec![] },
2783        ]).unwrap();
2784
2785        kg.create_relations(&[Relation {
2786            from: "A".into(), to: "B".into(), relation_type: "e".into(),
2787        }]).unwrap();
2788
2789        let result = kg.neighbors("A", Direction::Outgoing, None, 1).unwrap();
2790        let v: Value = serde_json::from_str(&result).unwrap();
2791        assert_eq!(v["entities"].as_array().unwrap().len(), 2);
2792        assert_eq!(v["relations"].as_array().unwrap().len(), 1);
2793    }
2794
2795    #[test]
2796    fn test_open_nodes() {
2797        let kg = new_kg();
2798        kg.create_entities(&[
2799            Entity { name: "X".into(), entity_type: "n".into(), observations: vec!["obs_x".into()] },
2800            Entity { name: "Y".into(), entity_type: "n".into(), observations: vec!["obs_y".into()] },
2801        ]).unwrap();
2802
2803        kg.create_relations(&[Relation {
2804            from: "X".into(), to: "Y".into(), relation_type: "e".into(),
2805        }]).unwrap();
2806
2807        let result = kg.open_nodes(&["X".into()]);
2808        let v: Value = serde_json::from_str(&result).unwrap();
2809        assert_eq!(v["entities"].as_array().unwrap().len(), 1);
2810        assert_eq!(v["relations"].as_array().unwrap().len(), 1);
2811    }
2812
2813    #[test]
2814    fn test_entities_exist() {
2815        let kg = new_kg();
2816        kg.create_entities(&[Entity {
2817            name: "exists".into(), entity_type: "t".into(), observations: vec![],
2818        }]).unwrap();
2819
2820        let res = kg.entities_exist(&["exists".into(), "missing".into()]).unwrap();
2821        assert_eq!(res, vec![true, false]);
2822    }
2823
2824    #[test]
2825    fn test_describe_entity() {
2826        let kg = new_kg();
2827        kg.create_entities(&[Entity {
2828            name: "desc".into(), entity_type: "t".into(), observations: vec!["o".into()],
2829        }]).unwrap();
2830
2831        let entity = kg.describe_entity("desc").unwrap();
2832        assert_eq!(entity.name, "desc");
2833    }
2834
2835    #[test]
2836    fn test_entity_type_counts() {
2837        let kg = new_kg();
2838        kg.create_entities(&[
2839            Entity { name: "a".into(), entity_type: "person".into(), observations: vec![] },
2840            Entity { name: "b".into(), entity_type: "person".into(), observations: vec![] },
2841            Entity { name: "c".into(), entity_type: "place".into(), observations: vec![] },
2842        ]).unwrap();
2843
2844        let counts = kg.entity_type_counts();
2845        let map: FxHashMap<_, _> = counts.into_iter().collect();
2846        assert_eq!(map.get("person"), Some(&2));
2847        assert_eq!(map.get("place"), Some(&1));
2848    }
2849
2850    #[test]
2851    fn test_relation_type_counts() {
2852        let kg = new_kg();
2853        kg.create_entities(&[
2854            Entity { name: "a".into(), entity_type: "n".into(), observations: vec![] },
2855            Entity { name: "b".into(), entity_type: "n".into(), observations: vec![] },
2856            Entity { name: "c".into(), entity_type: "n".into(), observations: vec![] },
2857        ]).unwrap();
2858
2859        kg.create_relations(&[
2860            Relation { from: "a".into(), to: "b".into(), relation_type: "knows".into() },
2861            Relation { from: "a".into(), to: "c".into(), relation_type: "knows".into() },
2862        ]).unwrap();
2863
2864        let counts = kg.relation_type_counts();
2865        let map: FxHashMap<_, _> = counts.into_iter().collect();
2866        assert_eq!(map.get("knows"), Some(&2));
2867    }
2868
2869    #[test]
2870    fn test_upsert_entities() {
2871        let kg = new_kg();
2872        kg.create_entities(&[Entity {
2873            name: "u".into(), entity_type: "old".into(), observations: vec!["existing".into()],
2874        }]).unwrap();
2875
2876        // Upsert with new type and additional observation.
2877        kg.upsert_entities(&[Entity {
2878            name: "u".into(), entity_type: "new".into(), observations: vec!["existing".into(), "added".into()],
2879        }]).unwrap();
2880
2881        let ent = kg.get_entity("u").unwrap().unwrap();
2882        assert_eq!(ent.entity_type, "new");
2883        assert!(ent.observations.contains(&"added".into()));
2884        assert!(ent.observations.contains(&"existing".into()));
2885    }
2886
2887    #[test]
2888    fn test_merge_entities() {
2889        let kg = new_kg();
2890        kg.create_entities(&[
2891            Entity { name: "source".into(), entity_type: "t".into(), observations: vec!["src_obs".into()] },
2892            Entity { name: "target".into(), entity_type: "t".into(), observations: vec!["tgt_obs".into()] },
2893        ]).unwrap();
2894
2895        kg.create_relations(&[Relation {
2896            from: "source".into(), to: "target".into(), relation_type: "e".into(),
2897        }]).unwrap();
2898
2899        let merged = kg.merge_entities("source", "target").unwrap();
2900        assert_eq!(merged.name, "target");
2901        assert_eq!(kg.get_entity("source").unwrap().is_none(), true);
2902    }
2903
2904    #[test]
2905    fn test_find_all_paths() {
2906        let kg = new_kg();
2907        kg.create_entities(&[
2908            Entity { name: "A".into(), entity_type: "n".into(), observations: vec![] },
2909            Entity { name: "B".into(), entity_type: "n".into(), observations: vec![] },
2910            Entity { name: "C".into(), entity_type: "n".into(), observations: vec![] },
2911        ]).unwrap();
2912
2913        kg.create_relations(&[
2914            Relation { from: "A".into(), to: "B".into(), relation_type: "e".into() },
2915            Relation { from: "B".into(), to: "C".into(), relation_type: "e".into() },
2916            Relation { from: "A".into(), to: "C".into(), relation_type: "e".into() },
2917        ]).unwrap();
2918
2919        let paths = kg.find_all_paths("A", "C", 5, 10).unwrap();
2920        assert!(paths.len() >= 2);
2921    }
2922
2923    #[test]
2924    fn test_batch_get_entities() {
2925        let kg = new_kg();
2926        kg.create_entities(&[
2927            Entity { name: "a".into(), entity_type: "t".into(), observations: vec![] },
2928            Entity { name: "b".into(), entity_type: "t".into(), observations: vec![] },
2929        ]).unwrap();
2930
2931        let results = kg.batch_get_entities(&["a".into(), "missing".into(), "b".into()]);
2932        assert_eq!(results.len(), 3);
2933        assert!(results[0].is_some());
2934        assert!(results[1].is_none());
2935        assert!(results[2].is_some());
2936    }
2937
2938    #[test]
2939    fn test_export_graph() {
2940        let kg = new_kg();
2941        kg.create_entities(&[Entity {
2942            name: "exp".into(), entity_type: "t".into(), observations: vec!["o".into()],
2943        }]).unwrap();
2944
2945        let exported = kg.export("json", i64::MAX).unwrap();
2946        assert!(exported.contains("exp"));
2947        assert!(exported.contains("o"));
2948    }
2949
2950    #[test]
2951    fn test_graph_stats() {
2952        let kg = new_kg();
2953        assert_eq!(kg.get_entity_count().unwrap(), 0);
2954        assert_eq!(kg.get_relation_count().unwrap(), 0);
2955
2956        kg.create_entities(&[Entity {
2957            name: "s".into(), entity_type: "t".into(), observations: vec![],
2958        }]).unwrap();
2959
2960        assert_eq!(kg.get_entity_count().unwrap(), 1);
2961    }
2962
2963    #[test]
2964    fn test_read_graph_filtered() {
2965        let kg = new_kg();
2966        kg.create_entities(&[
2967            Entity { name: "p1".into(), entity_type: "person".into(), observations: vec![] },
2968            Entity { name: "p2".into(), entity_type: "place".into(), observations: vec![] },
2969        ]).unwrap();
2970
2971        let out = kg.read_graph_filtered(Some("person"), 0, 10).unwrap();
2972        let v: Value = serde_json::from_str(&out).unwrap();
2973        assert_eq!(v["entities"].as_array().unwrap().len(), 1);
2974        assert_eq!(v["entities"][0]["name"], "p1");
2975    }
2976
2977    #[test]
2978    fn test_wipe() {
2979        let kg = new_kg();
2980        kg.create_entities(&[Entity {
2981            name: "w".into(), entity_type: "t".into(), observations: vec!["o".into()],
2982        }]).unwrap();
2983        assert_eq!(kg.get_entity_count().unwrap(), 1);
2984
2985        kg.wipe().unwrap();
2986        assert_eq!(kg.get_entity_count().unwrap(), 0);
2987    }
2988
2989    #[test]
2990    fn test_push_json_str() {
2991        let mut buf = String::new();
2992        push_json_str(&mut buf, "hello");
2993        assert_eq!(buf, "\"hello\"");
2994        let mut buf = String::new();
2995        push_json_str(&mut buf, "he\"llo");
2996        assert_eq!(buf, "\"he\\\"llo\"");
2997    }
2998
2999    // ── create_entities edge cases ────────────────────────────────────
3000
3001    #[test]
3002    fn test_create_entities_empty_input() {
3003        let kg = new_kg();
3004        let created = kg.create_entities(&[]).unwrap();
3005        assert!(created.is_empty());
3006    }
3007
3008    #[test]
3009    fn test_create_entities_skip_empty_name() {
3010        let kg = new_kg();
3011        let created = kg.create_entities(&[Entity {
3012            name: "".into(),
3013            entity_type: "t".into(),
3014            observations: vec![],
3015        }])
3016        .unwrap();
3017        assert!(created.is_empty());
3018        assert_eq!(kg.get_entity_count().unwrap(), 0);
3019    }
3020
3021    #[test]
3022    fn test_create_entities_duplicate_names() {
3023        let kg = new_kg();
3024        let e = Entity {
3025            name: "dup".into(),
3026            entity_type: "t".into(),
3027            observations: vec!["obs".into()],
3028        };
3029        let first = kg.create_entities(&[e.clone()]).unwrap();
3030        assert_eq!(first.len(), 1);
3031        let second = kg.create_entities(&[e.clone()]).unwrap();
3032        assert!(second.is_empty());
3033        assert_eq!(kg.get_entity_count().unwrap(), 1);
3034    }
3035
3036    #[test]
3037    fn test_create_entities_partial_duplicates() {
3038        let kg = new_kg();
3039        let created = kg.create_entities(&[
3040            Entity { name: "a".into(), entity_type: "t".into(), observations: vec![] },
3041            Entity { name: "b".into(), entity_type: "t".into(), observations: vec![] },
3042        ]).unwrap();
3043        assert_eq!(created.len(), 2);
3044
3045        let second = kg.create_entities(&[
3046            Entity { name: "b".into(), entity_type: "t".into(), observations: vec![] },
3047            Entity { name: "c".into(), entity_type: "t".into(), observations: vec![] },
3048        ]).unwrap();
3049        assert_eq!(second.len(), 1); // only c created
3050        assert_eq!(second[0].name, "c");
3051        assert_eq!(kg.get_entity_count().unwrap(), 3);
3052    }
3053
3054    #[test]
3055    fn test_create_entities_mixed_empty_and_valid() {
3056        let kg = new_kg();
3057        let created = kg.create_entities(&[
3058            Entity { name: "".into(), entity_type: "t".into(), observations: vec![] },
3059            Entity { name: "valid".into(), entity_type: "t".into(), observations: vec![] },
3060            Entity { name: "".into(), entity_type: "t".into(), observations: vec![] },
3061        ]).unwrap();
3062        assert_eq!(created.len(), 1);
3063        assert_eq!(created[0].name, "valid");
3064        assert_eq!(kg.get_entity_count().unwrap(), 1);
3065    }
3066
3067    #[test]
3068    fn test_create_entities_same_name_in_batch() {
3069        let kg = new_kg();
3070        let created = kg.create_entities(&[
3071            Entity { name: "dup_in_batch".into(), entity_type: "t".into(), observations: vec![] },
3072            Entity { name: "dup_in_batch".into(), entity_type: "t".into(), observations: vec![] },
3073        ]).unwrap();
3074        assert_eq!(created.len(), 1);
3075        assert_eq!(kg.get_entity_count().unwrap(), 1);
3076    }
3077
3078    // ── create_relations edge cases ───────────────────────────────────
3079
3080    #[test]
3081    fn test_create_relations_empty_input() {
3082        let kg = new_kg();
3083        let rels = kg.create_relations(&[]).unwrap();
3084        assert!(rels.is_empty());
3085    }
3086
3087    #[test]
3088    fn test_create_relations_nonexistent_from() {
3089        let kg = new_kg();
3090        kg.create_entities(&[Entity {
3091            name: "B".into(), entity_type: "t".into(), observations: vec![],
3092        }]).unwrap();
3093
3094        let rels = kg.create_relations(&[Relation {
3095            from: "A".into(), to: "B".into(), relation_type: "e".into(),
3096        }]).unwrap();
3097        assert!(rels.is_empty());
3098        assert_eq!(kg.get_relation_count().unwrap(), 0);
3099    }
3100
3101    #[test]
3102    fn test_create_relations_nonexistent_to() {
3103        let kg = new_kg();
3104        kg.create_entities(&[Entity {
3105            name: "A".into(), entity_type: "t".into(), observations: vec![],
3106        }]).unwrap();
3107
3108        let rels = kg.create_relations(&[Relation {
3109            from: "A".into(), to: "B".into(), relation_type: "e".into(),
3110        }]).unwrap();
3111        assert!(rels.is_empty());
3112        assert_eq!(kg.get_relation_count().unwrap(), 0);
3113    }
3114
3115    #[test]
3116    fn test_create_relations_both_nonexistent() {
3117        let kg = new_kg();
3118        let rels = kg.create_relations(&[Relation {
3119            from: "A".into(), to: "B".into(), relation_type: "e".into(),
3120        }]).unwrap();
3121        assert!(rels.is_empty());
3122    }
3123
3124    #[test]
3125    fn test_create_relations_self_loop() {
3126        let kg = new_kg();
3127        kg.create_entities(&[Entity {
3128            name: "self".into(), entity_type: "t".into(), observations: vec![],
3129        }]).unwrap();
3130
3131        let rels = kg.create_relations(&[Relation {
3132            from: "self".into(), to: "self".into(), relation_type: "loop".into(),
3133        }]).unwrap();
3134        assert_eq!(rels.len(), 1);
3135        assert_eq!(kg.get_relation_count().unwrap(), 1);
3136        assert_eq!(kg.degree("self", Direction::Outgoing).unwrap(), 1);
3137        assert_eq!(kg.degree("self", Direction::Incoming).unwrap(), 1);
3138    }
3139
3140    #[test]
3141    fn test_create_relations_duplicate() {
3142        let kg = new_kg();
3143        kg.create_entities(&[
3144            Entity { name: "A".into(), entity_type: "t".into(), observations: vec![] },
3145            Entity { name: "B".into(), entity_type: "t".into(), observations: vec![] },
3146        ]).unwrap();
3147
3148        let r = Relation {
3149            from: "A".into(), to: "B".into(), relation_type: "e".into(),
3150        };
3151        let first = kg.create_relations(&[r.clone()]).unwrap();
3152        assert_eq!(first.len(), 1);
3153
3154        let second = kg.create_relations(&[r.clone()]).unwrap();
3155        assert!(second.is_empty());
3156        assert_eq!(kg.get_relation_count().unwrap(), 1);
3157    }
3158
3159    #[test]
3160    fn test_create_relations_new_type_auto_created() {
3161        let kg = new_kg();
3162        kg.create_entities(&[
3163            Entity { name: "A".into(), entity_type: "t".into(), observations: vec![] },
3164            Entity { name: "B".into(), entity_type: "t".into(), observations: vec![] },
3165        ]).unwrap();
3166
3167        let rels = kg.create_relations(&[Relation {
3168            from: "A".into(), to: "B".into(), relation_type: "brand_new_type".into(),
3169        }]).unwrap();
3170        assert_eq!(rels.len(), 1);
3171
3172        let counts = kg.relation_type_counts();
3173        let map: FxHashMap<_, _> = counts.into_iter().collect();
3174        assert_eq!(map.get("brand_new_type"), Some(&1));
3175    }
3176
3177    #[test]
3178    fn test_create_relations_degree_updates() {
3179        let kg = new_kg();
3180        kg.create_entities(&[
3181            Entity { name: "A".into(), entity_type: "t".into(), observations: vec![] },
3182            Entity { name: "B".into(), entity_type: "t".into(), observations: vec![] },
3183            Entity { name: "C".into(), entity_type: "t".into(), observations: vec![] },
3184        ]).unwrap();
3185
3186        kg.create_relations(&[
3187            Relation { from: "A".into(), to: "B".into(), relation_type: "e".into() },
3188            Relation { from: "A".into(), to: "C".into(), relation_type: "e".into() },
3189        ]).unwrap();
3190
3191        assert_eq!(kg.degree("A", Direction::Outgoing).unwrap(), 2);
3192        assert_eq!(kg.degree("A", Direction::Incoming).unwrap(), 0);
3193        assert_eq!(kg.degree("B", Direction::Incoming).unwrap(), 1);
3194        assert_eq!(kg.degree("C", Direction::Incoming).unwrap(), 1);
3195        assert_eq!(kg.degree("A", Direction::Both).unwrap(), 2);
3196    }
3197
3198    #[test]
3199    fn test_create_relations_delete_and_recreate() {
3200        let kg = new_kg();
3201        kg.create_entities(&[
3202            Entity { name: "A".into(), entity_type: "t".into(), observations: vec![] },
3203            Entity { name: "B".into(), entity_type: "t".into(), observations: vec![] },
3204        ]).unwrap();
3205
3206        let r = Relation {
3207            from: "A".into(), to: "B".into(), relation_type: "e".into(),
3208        };
3209        kg.create_relations(&[r.clone()]).unwrap();
3210        assert_eq!(kg.get_relation_count().unwrap(), 1);
3211
3212        kg.delete_relations(&[r.clone()]).unwrap();
3213        assert_eq!(kg.get_relation_count().unwrap(), 0);
3214
3215        // Recreate after delete
3216        let re = kg.create_relations(&[r.clone()]).unwrap();
3217        assert_eq!(re.len(), 1);
3218        assert_eq!(kg.get_relation_count().unwrap(), 1);
3219    }
3220
3221    // ── Integration edge cases ────────────────────────────────────────
3222
3223    #[test]
3224    fn test_create_entities_then_relations_then_delete_entity_with_relations() {
3225        let kg = new_kg();
3226        kg.create_entities(&[
3227            Entity { name: "A".into(), entity_type: "t".into(), observations: vec![] },
3228            Entity { name: "B".into(), entity_type: "t".into(), observations: vec![] },
3229        ]).unwrap();
3230        kg.create_relations(&[
3231            Relation { from: "A".into(), to: "B".into(), relation_type: "e".into() },
3232        ]).unwrap();
3233
3234        assert_eq!(kg.get_relation_count().unwrap(), 1);
3235
3236        // Deleting entity A should also delete the relation
3237        kg.delete_entities(&["A".into()]).unwrap();
3238        assert_eq!(kg.get_entity("A").unwrap().is_none(), true);
3239        assert_eq!(kg.get_relation_count().unwrap(), 0);
3240    }
3241
3242    #[test]
3243    fn test_graph_stats_after_entity_with_observations() {
3244        let kg = new_kg();
3245        kg.create_entities(&[Entity {
3246            name: "stat".into(), entity_type: "t".into(),
3247            observations: vec!["o1".into(), "o2".into(), "o3".into()],
3248        }]).unwrap();
3249
3250        let ecount = kg.get_entity_count().unwrap();
3251        // graph_stat for observations is tracked but there's no public getter for it
3252        assert_eq!(ecount, 1);
3253
3254        // delete reverts stats
3255        kg.delete_entities(&["stat".into()]).unwrap();
3256        assert_eq!(kg.get_entity_count().unwrap(), 0);
3257    }
3258
3259    // ── Helpers for the fix-specific suites ────────────────────────────────
3260
3261    fn new_kg_with_pool(read_pool_size: usize) -> TestKg {
3262        use std::sync::atomic::AtomicU64;
3263        static COUNTER: AtomicU64 = AtomicU64::new(1_000_000);
3264        let n = COUNTER.fetch_add(1, Ordering::SeqCst);
3265        let path = std::env::temp_dir().join(format!("kg_pool_{}_{}.db", std::process::id(), n));
3266        cleanup_db(&path);
3267        let kg = GraphHandle::new(
3268            &path,
3269            Durability::Async,
3270            SqliteTuning::default(),
3271            NonZeroUsize::new(10_000).unwrap(),
3272            read_pool_size,
3273        )
3274        .expect("create KG");
3275        TestKg(kg, path)
3276    }
3277
3278    fn seed_line(kg: &GraphHandle, n: usize) {
3279        let entities: Vec<Entity> = (0..n)
3280            .map(|i| Entity {
3281                name: format!("n{i}"),
3282                entity_type: "node".into(),
3283                observations: vec![format!("obs of n{i}")],
3284            })
3285            .collect();
3286        kg.create_entities(&entities).unwrap();
3287        let rels: Vec<Relation> = (0..n.saturating_sub(1))
3288            .map(|i| Relation {
3289                from: format!("n{i}"),
3290                to: format!("n{}", i + 1),
3291                relation_type: "edge".into(),
3292            })
3293            .collect();
3294        if !rels.is_empty() {
3295            kg.create_relations(&rels).unwrap();
3296        }
3297    }
3298
3299    fn count_relations(graph_json: &str) -> usize {
3300        let v: Value = serde_json::from_str(graph_json).unwrap();
3301        v["relations"].as_array().unwrap().len()
3302    }
3303
3304    fn count_entities(graph_json: &str) -> usize {
3305        let v: Value = serde_json::from_str(graph_json).unwrap();
3306        v["entities"].as_array().unwrap().len()
3307    }
3308
3309    // ── Fix #1: reader pool / concurrency ──────────────────────────────────
3310
3311    #[test]
3312    fn test_pool_size_one_still_works() {
3313        let kg = new_kg_with_pool(1);
3314        seed_line(&kg, 5);
3315        assert_eq!(kg.get_entity_count().unwrap(), 5);
3316        assert!(kg.get_entity("n2").unwrap().is_some());
3317        let g = kg.read_graph_filtered(None, 0, usize::MAX).unwrap();
3318        assert_eq!(count_entities(&g), 5);
3319    }
3320
3321    #[test]
3322    fn test_reads_see_committed_writes() {
3323        // A read on a pool connection must observe a just-committed write made on
3324        // the writer connection (WAL visibility).
3325        let kg = new_kg_with_pool(4);
3326        kg.create_entities(&[Entity {
3327            name: "fresh".into(),
3328            entity_type: "t".into(),
3329            observations: vec!["v".into()],
3330        }])
3331        .unwrap();
3332        // get_entity goes through the reader pool.
3333        let got = kg.get_entity("fresh").unwrap().unwrap();
3334        assert_eq!(got.observations, vec!["v"]);
3335    }
3336
3337    #[test]
3338    fn test_concurrent_readers_consistent() {
3339        // Many readers hammering the pool while the writer mutates must never
3340        // panic, deadlock, or observe a torn graph. The final counts must match.
3341        let kg = new_kg_with_pool(4);
3342        seed_line(&kg, 50);
3343
3344        std::thread::scope(|s| {
3345            // 8 reader threads.
3346            for _ in 0..8 {
3347                s.spawn(|| {
3348                    for _ in 0..200 {
3349                        let _ = kg.get_entity("n10");
3350                        let _ = kg.search_nodes_filtered("obs", None, 0, 10);
3351                        let _ = kg.read_graph_filtered(None, 0, 100);
3352                        let _ = kg.get_entity_count();
3353                        let _ = kg.neighbors("n10", Direction::Both, None, 2);
3354                    }
3355                });
3356            }
3357            // 1 writer thread adding more entities concurrently.
3358            s.spawn(|| {
3359                for i in 100..160 {
3360                    kg.create_entities(&[Entity {
3361                        name: format!("w{i}"),
3362                        entity_type: "node".into(),
3363                        observations: vec![format!("w obs {i}")],
3364                    }])
3365                    .unwrap();
3366                }
3367            });
3368        });
3369
3370        // 50 seeded + 60 written.
3371        assert_eq!(kg.get_entity_count().unwrap(), 110);
3372        assert!(kg.get_entity("w159").unwrap().is_some());
3373    }
3374
3375    #[test]
3376    fn test_reader_pool_rejects_writes_internally() {
3377        // Sanity: query_only readers cannot mutate. We can't call a write through
3378        // the pool directly, but we can confirm a read method that *would* have
3379        // inserted (search_relations resolving a missing type) does not create a
3380        // phantom type — see the dedicated test below — and that reads under a
3381        // size-1 pool serialize correctly without deadlock.
3382        let kg = new_kg_with_pool(1);
3383        seed_line(&kg, 3);
3384        std::thread::scope(|s| {
3385            for _ in 0..4 {
3386                s.spawn(|| {
3387                    for _ in 0..100 {
3388                        let _ = kg.read_graph_filtered(None, 0, 10);
3389                    }
3390                });
3391            }
3392        });
3393        assert_eq!(kg.get_entity_count().unwrap(), 3);
3394    }
3395
3396    // ── Fix #6: read_graph relation scoping + export bound ─────────────────
3397
3398    #[test]
3399    fn test_read_graph_relations_scoped_to_page() {
3400        let kg = new_kg_with_pool(2);
3401        // n0 -> n1 -> n2 -> n3 (4 entities, 3 edges).
3402        seed_line(&kg, 4);
3403
3404        // Full page: all 3 edges present.
3405        let full = kg.read_graph_filtered(None, 0, usize::MAX).unwrap();
3406        assert_eq!(count_entities(&full), 4);
3407        assert_eq!(count_relations(&full), 3);
3408
3409        // Page of only the first entity (n0): its only edge n0->n1 has an
3410        // endpoint (n1) outside the page, so no relations are returned.
3411        let page1 = kg.read_graph_filtered(None, 0, 1).unwrap();
3412        assert_eq!(count_entities(&page1), 1);
3413        assert_eq!(count_relations(&page1), 0);
3414
3415        // Page of first two entities (n0, n1): edge n0->n1 fully inside, n1->n2
3416        // straddles the boundary and is excluded.
3417        let page2 = kg.read_graph_filtered(None, 0, 2).unwrap();
3418        assert_eq!(count_entities(&page2), 2);
3419        assert_eq!(count_relations(&page2), 1);
3420    }
3421
3422    #[test]
3423    fn test_read_graph_pagination_offset() {
3424        let kg = new_kg_with_pool(2);
3425        seed_line(&kg, 5);
3426        let g = kg.read_graph_filtered(None, 2, 2).unwrap();
3427        assert_eq!(count_entities(&g), 2);
3428        // Entities are ordered by id; offset 2 skips n0, n1.
3429        assert!(!g.contains("\"n0\""));
3430        assert!(!g.contains("\"n1\""));
3431        assert!(g.contains("\"n2\""));
3432        assert!(g.contains("\"n3\""));
3433    }
3434
3435    #[test]
3436    fn test_read_graph_empty() {
3437        let kg = new_kg_with_pool(2);
3438        let g = kg.read_graph_filtered(None, 0, usize::MAX).unwrap();
3439        assert_eq!(g, r#"{"entities":[],"relations":[]}"#);
3440    }
3441
3442    #[test]
3443    fn test_read_graph_filtered_by_type() {
3444        let kg = new_kg_with_pool(2);
3445        kg.create_entities(&[
3446            Entity { name: "p1".into(), entity_type: "person".into(), observations: vec![] },
3447            Entity { name: "q1".into(), entity_type: "place".into(), observations: vec![] },
3448            Entity { name: "p2".into(), entity_type: "person".into(), observations: vec![] },
3449        ])
3450        .unwrap();
3451        let g = kg.read_graph_filtered(Some("person"), 0, usize::MAX).unwrap();
3452        assert_eq!(count_entities(&g), 2);
3453        assert!(g.contains("\"p1\""));
3454        assert!(g.contains("\"p2\""));
3455        assert!(!g.contains("\"q1\""));
3456    }
3457
3458    #[test]
3459    fn test_export_respects_max_rows() {
3460        let kg = new_kg_with_pool(2);
3461        seed_line(&kg, 5);
3462
3463        // Unbounded export returns everything.
3464        let full = kg.export("json", i64::MAX).unwrap();
3465        assert_eq!(count_entities(&full), 5);
3466        assert_eq!(count_relations(&full), 4);
3467
3468        // Capped export truncates both arrays to the cap.
3469        let capped = kg.export("json", 2).unwrap();
3470        assert_eq!(count_entities(&capped), 2);
3471        assert_eq!(count_relations(&capped), 2);
3472    }
3473
3474    #[test]
3475    fn test_export_negative_max_rows_is_unbounded() {
3476        let kg = new_kg_with_pool(2);
3477        seed_line(&kg, 3);
3478        // SQLite treats a negative LIMIT as "no limit".
3479        let out = kg.export("json", -1).unwrap();
3480        assert_eq!(count_entities(&out), 3);
3481    }
3482
3483    // ── Fix #8: writes remain correct without the per-write PRAGMA optimize ─
3484
3485    #[test]
3486    fn test_many_small_write_batches_stay_consistent() {
3487        let kg = new_kg_with_pool(2);
3488        for i in 0..100 {
3489            kg.create_entities(&[Entity {
3490                name: format!("e{i}"),
3491                entity_type: "t".into(),
3492                observations: vec![format!("o{i}")],
3493            }])
3494            .unwrap();
3495        }
3496        assert_eq!(kg.get_entity_count().unwrap(), 100);
3497        // Search must still find a needle inserted across many tiny batches,
3498        // proving FTS stayed consistent without per-write optimization.
3499        let hits = kg.search_nodes_filtered("e57", None, 0, 10);
3500        assert!(hits.iter().any(|e| e.name == "e57"));
3501    }
3502
3503    // ── Fix #9: wipe fully resets the FTS indexes ──────────────────────────
3504
3505    #[test]
3506    fn test_wipe_clears_name_and_obs_fts() {
3507        let kg = new_kg_with_pool(2);
3508        kg.create_entities(&[Entity {
3509            name: "Einstein".into(),
3510            entity_type: "scientist".into(),
3511            observations: vec!["physics".into()],
3512        }])
3513        .unwrap();
3514
3515        // Both FTS indexes resolve before the wipe.
3516        assert_eq!(kg.search_nodes_filtered("Einstein", None, 0, 10).len(), 1);
3517        assert_eq!(kg.search_nodes_filtered("physics", None, 0, 10).len(), 1);
3518
3519        kg.wipe().unwrap();
3520
3521        // After wipe both indexes must be empty (a bare DELETE on an
3522        // external-content FTS5 table would have left stale rowids behind).
3523        assert_eq!(kg.get_entity_count().unwrap(), 0);
3524        assert!(kg.search_nodes_filtered("Einstein", None, 0, 10).is_empty());
3525        assert!(kg.search_nodes_filtered("physics", None, 0, 10).is_empty());
3526    }
3527
3528    #[test]
3529    fn test_wipe_then_recreate_search_works() {
3530        // Recreating the same names after a wipe must produce a clean, searchable
3531        // index — not a corrupted one or duplicate FTS rows.
3532        let kg = new_kg_with_pool(2);
3533        kg.create_entities(&[Entity {
3534            name: "Einstein".into(),
3535            entity_type: "scientist".into(),
3536            observations: vec!["physics".into()],
3537        }])
3538        .unwrap();
3539        kg.wipe().unwrap();
3540
3541        kg.create_entities(&[Entity {
3542            name: "Einstein".into(),
3543            entity_type: "scientist".into(),
3544            observations: vec!["physics".into(), "relativity".into()],
3545        }])
3546        .unwrap();
3547
3548        let by_name = kg.search_nodes_filtered("Einstein", None, 0, 10);
3549        assert_eq!(by_name.len(), 1, "exactly one Einstein after recreate");
3550        let by_obs = kg.search_nodes_filtered("relativity", None, 0, 10);
3551        assert_eq!(by_obs.len(), 1);
3552        assert_eq!(kg.get_entity_count().unwrap(), 1);
3553    }
3554
3555    // ── Read-only type/entity resolution (introduced by the reader pool) ───
3556
3557    #[test]
3558    fn test_search_relations_missing_type_returns_empty() {
3559        let kg = new_kg_with_pool(2);
3560        seed_line(&kg, 3); // edges of type "edge"
3561        // A filter for a relation type that does not exist must return nothing,
3562        // not every relation — and must not create a phantom type row.
3563        let r = kg.search_relations(None, None, Some("does_not_exist"));
3564        assert!(r.is_empty());
3565        // The phantom type must not have been inserted by the read.
3566        let types = kg.relation_type_counts();
3567        assert!(types.iter().all(|(t, _)| t != "does_not_exist"));
3568    }
3569
3570    #[test]
3571    fn test_search_relations_missing_from_returns_empty() {
3572        let kg = new_kg_with_pool(2);
3573        seed_line(&kg, 3);
3574        let r = kg.search_relations(Some("ghost"), None, None);
3575        assert!(r.is_empty(), "missing 'from' must not match every relation");
3576    }
3577
3578    #[test]
3579    fn test_search_relations_existing_filters_still_work() {
3580        let kg = new_kg_with_pool(2);
3581        seed_line(&kg, 3);
3582        let r = kg.search_relations(Some("n0"), None, Some("edge"));
3583        assert_eq!(r.len(), 1);
3584        assert_eq!(r[0].from, "n0");
3585        assert_eq!(r[0].to, "n1");
3586    }
3587
3588    #[test]
3589    fn test_neighbors_missing_type_returns_only_start() {
3590        let kg = new_kg_with_pool(2);
3591        seed_line(&kg, 3);
3592        let json = kg
3593            .neighbors("n0", Direction::Both, Some("nonexistent"), 2)
3594            .unwrap();
3595        // No edge matches the bogus type, so only the start node comes back.
3596        assert_eq!(count_entities(&json), 1);
3597        assert_eq!(count_relations(&json), 0);
3598    }
3599
3600    #[test]
3601    fn test_neighbors_existing_type_filters() {
3602        let kg = new_kg_with_pool(2);
3603        kg.create_entities(&[
3604            Entity { name: "a".into(), entity_type: "n".into(), observations: vec![] },
3605            Entity { name: "b".into(), entity_type: "n".into(), observations: vec![] },
3606            Entity { name: "c".into(), entity_type: "n".into(), observations: vec![] },
3607        ])
3608        .unwrap();
3609        kg.create_relations(&[
3610            Relation { from: "a".into(), to: "b".into(), relation_type: "knows".into() },
3611            Relation { from: "a".into(), to: "c".into(), relation_type: "likes".into() },
3612        ])
3613        .unwrap();
3614        let json = kg.neighbors("a", Direction::Outgoing, Some("knows"), 1).unwrap();
3615        assert!(json.contains("\"b\""));
3616        assert!(!json.contains("\"c\""));
3617        assert_eq!(count_relations(&json), 1);
3618    }
3619
3620    #[test]
3621    fn test_sqlite_tuning_applied_to_fresh_db() {
3622        use std::sync::atomic::AtomicU64;
3623        static COUNTER: AtomicU64 = AtomicU64::new(2_000_000);
3624        let n = COUNTER.fetch_add(1, Ordering::SeqCst);
3625        let path = std::env::temp_dir().join(format!("kg_tuning_{}_{}.db", std::process::id(), n));
3626        cleanup_db(&path);
3627
3628        let tuning = SqliteTuning {
3629            page_size: 8192,
3630            ..SqliteTuning::default()
3631        };
3632        let kg = TestKg(
3633            GraphHandle::new(&path, Durability::Async, tuning, NonZeroUsize::new(64).unwrap(), 2)
3634                .expect("create KG"),
3635            path.clone(),
3636        );
3637        kg.create_entities(&[Entity {
3638            name: "a".into(),
3639            entity_type: "n".into(),
3640            observations: vec!["o".into()],
3641        }])
3642        .unwrap();
3643
3644        // page_size (fresh-DB only) and auto_vacuum=INCREMENTAL must have taken
3645        // effect, and journal_mode must be WAL.
3646        let probe = Connection::open(&path).unwrap();
3647        let page_size: i64 = probe.query_row("PRAGMA page_size", [], |r| r.get(0)).unwrap();
3648        assert_eq!(page_size, 8192);
3649        let auto_vacuum: i64 = probe.query_row("PRAGMA auto_vacuum", [], |r| r.get(0)).unwrap();
3650        assert_eq!(auto_vacuum, 2, "expected INCREMENTAL auto_vacuum");
3651        let journal: String = probe.query_row("PRAGMA journal_mode", [], |r| r.get(0)).unwrap();
3652        assert_eq!(journal.to_lowercase(), "wal");
3653    }
3654
3655    #[test]
3656    fn test_checkpoint_passive_is_noop_safe() {
3657        let kg = new_kg();
3658        // On an empty / freshly-written DB a passive checkpoint must succeed.
3659        kg.checkpoint_passive().unwrap();
3660        kg.create_entities(&[Entity {
3661            name: "a".into(),
3662            entity_type: "n".into(),
3663            observations: vec!["o".into()],
3664        }])
3665        .unwrap();
3666        // And after a write, repeatedly, without error or deadlock.
3667        kg.checkpoint_passive().unwrap();
3668        kg.checkpoint_passive().unwrap();
3669        // Data is still readable afterwards.
3670        assert!(kg.get_entity("a").unwrap().is_some());
3671    }
3672}