Skip to main content

mcp_memory/
kg.rs

1use rustc_hash::FxHashMap;
2use std::collections::{HashSet, VecDeque};
3use std::num::NonZeroUsize;
4use std::path::Path;
5use std::sync::atomic::{AtomicI64, AtomicUsize, Ordering};
6use std::time::{Duration, SystemTime, UNIX_EPOCH};
7
8use lru::LruCache;
9use parking_lot::{Mutex, MutexGuard};
10use rusqlite::{params, types::ToSql, Connection, OpenFlags};
11
12use crate::config::{Durability, SqliteTuning};
13use crate::errors::{MCSError, Result};
14use crate::types::{Entity, Relation};
15
16fn sqlite_err(e: rusqlite::Error) -> MCSError {
17    MCSError::IoError(std::io::Error::other(e))
18}
19
20const fn is_not_found(e: &rusqlite::Error) -> bool {
21    matches!(e, rusqlite::Error::QueryReturnedNoRows)
22}
23
24#[inline(always)]
25fn now_us() -> i64 {
26    SystemTime::now()
27        .duration_since(UNIX_EPOCH)
28        .unwrap_or_default()
29        .as_micros() as i64
30}
31
32#[inline(always)]
33pub(crate) fn name_hash(name: &str) -> i64 {
34    let mut h: u64 = 0xcbf29ce484222325;
35    for b in name.bytes() {
36        h ^= u64::from(b);
37        h = h.wrapping_mul(0x100000001b3);
38    }
39    h as i64
40}
41
42fn load_observations(conn: &Connection, entity_id: i64) -> Result<Vec<String>> {
43    let mut stmt = conn
44        .prepare_cached("SELECT body FROM observation WHERE entity_id = ?1 ORDER BY idx")
45        .map_err(sqlite_err)?;
46    let rows = stmt
47        .query_map(params![entity_id], |row| row.get::<_, String>(0))
48        .map_err(sqlite_err)?
49        .filter_map(|r| r.ok())
50        .collect::<Vec<_>>();
51    Ok(rows)
52}
53
54fn load_observations_opt(conn: &Connection, entity_id: i64) -> Vec<String> {
55    load_observations(conn, entity_id).unwrap_or_default()
56}
57
58fn entity_name_lookup(conn: &Connection, name: &str) -> Result<Option<i64>> {
59    let h = name_hash(name);
60    let mut stmt = conn
61        .prepare_cached(
62            "SELECT id FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
63        )
64        .map_err(sqlite_err)?;
65    match stmt.query_row(params![h, name], |row| row.get::<_, i64>(0)) {
66        Ok(id) => Ok(Some(id)),
67        Err(e) if is_not_found(&e) => Ok(None),
68        Err(e) => Err(sqlite_err(e)),
69    }
70}
71
72fn get_type_id(conn: &Connection, type_name: &str, kind: i64) -> Result<i64> {
73    let mut sel = conn
74        .prepare_cached("SELECT id FROM type_dict WHERE kind = ?1 AND name = ?2")
75        .map_err(sqlite_err)?;
76    if let Ok(id) = sel.query_row(params![kind, type_name], |row| row.get::<_, i64>(0)) {
77        return Ok(id);
78    }
79    conn.execute(
80        "INSERT INTO type_dict (kind, name, count) VALUES (?1, ?2, 0)",
81        params![kind, type_name],
82    )
83    .map_err(sqlite_err)?;
84    Ok(conn.last_insert_rowid())
85}
86
87/// Read-only type lookup. Unlike [`get_type_id`] this never inserts, so it is
88/// safe to call on a `query_only` reader connection. Returns `None` when the
89/// type does not exist.
90fn lookup_type_id(conn: &Connection, type_name: &str, kind: i64) -> Option<i64> {
91    conn.prepare_cached("SELECT id FROM type_dict WHERE kind = ?1 AND name = ?2")
92        .ok()?
93        .query_row(params![kind, type_name], |row| row.get::<_, i64>(0))
94        .ok()
95}
96
97fn inc_type_count(conn: &Connection, type_id: i64, delta: i64) -> Result<()> {
98    conn.execute(
99        "UPDATE type_dict SET count = count + ?1 WHERE id = ?2",
100        params![delta, type_id],
101    )
102    .map_err(sqlite_err)?;
103    Ok(())
104}
105
106fn inc_graph_stat(conn: &Connection, key: &str, delta: i64) -> Result<()> {
107    conn.execute(
108        "UPDATE graph_stat SET value = value + ?1 WHERE key = ?2",
109        params![delta, key],
110    )
111    .map_err(sqlite_err)?;
112    Ok(())
113}
114
115fn read_graph_stat(conn: &Connection, key: &str) -> Result<i64> {
116    conn.query_row(
117        "SELECT value FROM graph_stat WHERE key = ?1",
118        params![key],
119        |row| row.get(0),
120    )
121    .map_err(sqlite_err)
122}
123
124fn name_of_type(conn: &Connection, type_id: i64) -> Result<String> {
125    conn.query_row(
126        "SELECT name FROM type_dict WHERE id = ?1",
127        params![type_id],
128        |row| row.get(0),
129    )
130    .map_err(sqlite_err)
131}
132
133fn select_all_types(conn: &Connection, kind: i64) -> Result<Vec<(String, usize)>> {
134    let mut stmt = conn
135        .prepare_cached(
136            "SELECT name, count FROM type_dict WHERE kind = ?1 AND count > 0 ORDER BY count DESC",
137        )
138        .map_err(sqlite_err)?;
139    let rows = stmt
140        .query_map(params![kind], |row| {
141            Ok((
142                row.get::<_, String>(0)?,
143                row.get::<_, i64>(1)? as usize,
144            ))
145        })
146        .map_err(sqlite_err)?
147        .filter_map(|r| r.ok())
148        .collect();
149    Ok(rows)
150}
151
152fn entity_by_id(conn: &Connection, id: i64) -> Result<Entity> {
153    let mut stmt = conn
154        .prepare_cached(
155            "SELECT e.name, t.name,
156               COALESCE((SELECT json_group_array(o.body ORDER BY o.idx) FROM observation o WHERE o.entity_id = e.id), '[]')
157             FROM entity e JOIN type_dict t ON t.id = e.type_id WHERE e.id = ?1 AND e.flags = 0",
158        )
159        .map_err(sqlite_err)?;
160    let (name, etype, obs_json): (String, String, String) = stmt
161        .query_row(params![id], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)))
162        .map_err(sqlite_err)?;
163    let observations: Vec<String> = serde_json::from_str(&obs_json).unwrap_or_default();
164    Ok(Entity {
165        name,
166        entity_type: etype,
167        observations,
168    })
169}
170
171/// Direction of relation traversal.
172#[derive(Debug, Clone, Copy, PartialEq, Eq)]
173pub enum Direction {
174    Outgoing,
175    Incoming,
176    Both,
177}
178
179impl Direction {
180    pub fn parse(s: Option<&str>) -> Self {
181        match s {
182            Some("OUTGOING") => Direction::Outgoing,
183            Some("INCOMING") => Direction::Incoming,
184            _ => Direction::Both,
185        }
186    }
187}
188
189/// Escape a string for embedding in JSON, writing directly into the given buffer.
190/// Avoids allocating a temporary `serde_json::Value` for the JSON-RPC wrapper.
191pub fn push_json_str(buf: &mut String, raw: &str) {
192    buf.push('"');
193    let mut start = 0;
194    let bytes = raw.as_bytes();
195    for (i, &b) in bytes.iter().enumerate() {
196        let esc: u8 = match b {
197            b'"' => b'"',
198            b'\\' => b'\\',
199            b'\n' => b'n',
200            b'\r' => b'r',
201            b'\t' => b't',
202            0x08 => b'b',
203            0x0C => b'f',
204            0x00..=0x07 | 0x0B | 0x0E..=0x1F => continue, // escaped below
205            _ => continue,
206        };
207        buf.push_str(&raw[start..i]);
208        buf.push('\\');
209        buf.push(esc as char);
210        start = i + 1;
211    }
212    // Control chars 0x00-0x1F not handled above: escape as \u00XX
213    for (i, &b) in bytes.iter().enumerate().skip(start) {
214        if b <= 0x07 || b == 0x0B || (0x0E..=0x1F).contains(&b) {
215            buf.push_str(&raw[start..i]);
216            write_escape_unicode(buf, b);
217            start = i + 1;
218        }
219    }
220    buf.push_str(&raw[start..]);
221    buf.push('"');
222}
223
224#[inline(never)]
225fn write_escape_unicode(buf: &mut String, b: u8) {
226    use std::fmt::Write;
227    write!(buf, "\\u{:04x}", b).unwrap();
228}
229
230// ── MetaCache ────────────────────────────────────────────────────────────
231
232#[derive(Copy, Clone)]
233struct EntityMeta {
234    id: i64,
235    type_id: i64,
236    obs_count: i64,
237    out_deg: i64,
238    in_deg: i64,
239}
240
241// ── Transaction guard (RAII rollback on error) ─────────────────────────
242
243struct TxGuard<'a> {
244    conn: &'a Connection,
245    done: bool,
246}
247
248impl<'a> TxGuard<'a> {
249    fn begin(conn: &'a Connection) -> Result<Self> {
250        // BEGIN IMMEDIATE acquires the WAL write lock up front rather than
251        // lazily on the first write. This makes the busy-timeout apply to lock
252        // acquisition deterministically and avoids `SQLITE_BUSY_SNAPSHOT`
253        // surprises when readers are concurrently active.
254        conn.execute_batch("BEGIN IMMEDIATE").map_err(sqlite_err)?;
255        Ok(Self { conn, done: false })
256    }
257
258    fn commit(mut self) -> Result<()> {
259        self.done = true;
260        self.conn.execute_batch("COMMIT").map_err(sqlite_err)
261    }
262}
263
264impl Drop for TxGuard<'_> {
265    fn drop(&mut self) {
266        if !self.done {
267            let _ = self.conn.execute_batch("ROLLBACK");
268        }
269    }
270}
271
272// ── Reader pool ───────────────────────────────────────────────────────────
273
274/// A small fixed pool of `query_only` SQLite connections used for read
275/// operations. WAL mode permits any number of concurrent readers alongside the
276/// single writer, so spreading reads across several connections lets them run
277/// in parallel instead of serializing on the writer's mutex.
278struct ReaderPool {
279    conns: Vec<Mutex<Connection>>,
280    next: AtomicUsize,
281}
282
283impl ReaderPool {
284    /// Acquire a reader connection. Fast path: grab the first idle one. If every
285    /// connection is busy, block on a round-robin pick so callers still make
286    /// progress (and never spin).
287    fn get(&self) -> MutexGuard<'_, Connection> {
288        for c in &self.conns {
289            if let Some(g) = c.try_lock() {
290                return g;
291            }
292        }
293        let i = self.next.fetch_add(1, Ordering::Relaxed) % self.conns.len();
294        self.conns[i].lock()
295    }
296}
297
298// ── GraphHandle ──────────────────────────────────────────────────────────
299
300pub struct GraphHandle {
301    /// The single read-write connection. SQLite allows only one writer, so all
302    /// mutations serialize here.
303    writer: Mutex<Connection>,
304    /// Pool of `query_only` connections for concurrent reads (WAL).
305    readers: ReaderPool,
306    seq_entity: AtomicI64,
307    seq_obs: AtomicI64,
308    cache: Mutex<LruCache<String, EntityMeta>>,
309}
310
311/// Open one `query_only` reader connection against an existing WAL database.
312///
313/// The connection is opened read-write at the OS level (so it can attach to the
314/// `-shm` wal-index — SQLite cannot read a WAL database through a pure
315/// `SQLITE_OPEN_READ_ONLY` handle) and then locked to reads with
316/// `PRAGMA query_only = ON`, which makes any accidental write error out.
317fn open_reader(path: &Path, tuning: &SqliteTuning) -> Result<Connection> {
318    let conn = Connection::open_with_flags(
319        path,
320        OpenFlags::SQLITE_OPEN_READ_WRITE
321            | OpenFlags::SQLITE_OPEN_NO_MUTEX
322            | OpenFlags::SQLITE_OPEN_URI,
323    )
324    .map_err(sqlite_err)?;
325    conn.busy_timeout(Duration::from_millis(tuning.busy_timeout_ms))
326        .map_err(sqlite_err)?;
327    conn.execute_batch(&format!(
328        "PRAGMA query_only   = ON;
329         PRAGMA cache_size   = -{};
330         PRAGMA temp_store   = MEMORY;
331         PRAGMA mmap_size    = {};",
332        tuning.cache_size_kb, tuning.mmap_size
333    ))
334    .map_err(sqlite_err)?;
335    Ok(conn)
336}
337
338impl GraphHandle {
339    pub fn new(
340        path: &Path,
341        durability: Durability,
342        tuning: SqliteTuning,
343        lru_cache_size: NonZeroUsize,
344        read_pool_size: usize,
345    ) -> Result<Self> {
346        let conn = Connection::open(path).map_err(sqlite_err)?;
347        // Apply the busy handler through the API so it is in force for every
348        // subsequent statement (including schema creation and BEGIN IMMEDIATE).
349        conn.busy_timeout(Duration::from_millis(tuning.busy_timeout_ms))
350            .map_err(sqlite_err)?;
351
352        // `page_size` and `auto_vacuum` are fixed when the database first gets
353        // content, and `page_size` additionally must precede `journal_mode=WAL`.
354        // Set both up front on this connection, before any table is created, so
355        // they take effect on a fresh database. On an existing database they are
356        // silently ignored (would require VACUUM to change).
357        conn.execute_batch(&format!(
358            "PRAGMA page_size    = {};
359             PRAGMA auto_vacuum  = INCREMENTAL;",
360            tuning.page_size
361        ))
362        .map_err(sqlite_err)?;
363
364        conn.execute_batch(&format!(
365             "PRAGMA journal_mode = WAL;
366             PRAGMA foreign_keys = OFF;
367             PRAGMA cache_size    = -{};
368             PRAGMA temp_store    = MEMORY;
369             PRAGMA busy_timeout  = {};
370             PRAGMA synchronous   = NORMAL;
371             PRAGMA journal_size_limit = {};",
372            tuning.cache_size_kb, tuning.busy_timeout_ms, tuning.journal_size_limit
373        ))
374        .map_err(sqlite_err)?;
375
376        conn.execute_batch(
377             "CREATE TABLE IF NOT EXISTS entity (
378                 id          INTEGER PRIMARY KEY,
379                 name_hash   INTEGER NOT NULL,
380                 name        TEXT    NOT NULL,
381                 type_id     INTEGER NOT NULL,
382                 obs_count   INTEGER NOT NULL DEFAULT 0,
383                 out_deg     INTEGER NOT NULL DEFAULT 0,
384                 in_deg      INTEGER NOT NULL DEFAULT 0,
385                 created_us  INTEGER NOT NULL,
386                 updated_us  INTEGER NOT NULL,
387                 flags       INTEGER NOT NULL DEFAULT 0
388             ) STRICT;
389
390             CREATE INDEX IF NOT EXISTS entity_by_hash
391                 ON entity(name_hash, type_id, obs_count, out_deg, in_deg)
392                 WHERE flags = 0;
393
394             CREATE INDEX IF NOT EXISTS entity_name_ci
395                 ON entity(lower(name))
396                 WHERE flags = 0;
397
398             CREATE TABLE IF NOT EXISTS observation (
399                 id          INTEGER PRIMARY KEY,
400                 entity_id   INTEGER NOT NULL,
401                 idx         INTEGER NOT NULL,
402                 body        TEXT    NOT NULL,
403                 created_us  INTEGER NOT NULL
404             ) STRICT;
405
406             CREATE INDEX IF NOT EXISTS obs_by_entity
407                 ON observation(entity_id, idx);
408
409             CREATE TABLE IF NOT EXISTS relation (
410                 from_id     INTEGER NOT NULL,
411                 to_id       INTEGER NOT NULL,
412                 type_id     INTEGER NOT NULL,
413                 created_us  INTEGER NOT NULL
414             ) STRICT;
415
416             CREATE INDEX IF NOT EXISTS rel_out
417                 ON relation(from_id, type_id, to_id);
418
419             CREATE INDEX IF NOT EXISTS rel_in
420                 ON relation(to_id, type_id, from_id);
421
422             CREATE VIRTUAL TABLE IF NOT EXISTS name_fts
423                 USING fts5(name, content='entity', content_rowid='id',
424                            tokenize='unicode61 remove_diacritics 2');
425
426             CREATE VIRTUAL TABLE IF NOT EXISTS obs_fts
427                 USING fts5(body, content='observation', content_rowid='id',
428                            tokenize='unicode61 remove_diacritics 2');
429
430             CREATE TRIGGER IF NOT EXISTS obs_fts_ai AFTER INSERT ON observation BEGIN
431               INSERT INTO obs_fts(rowid, body) VALUES (new.id, new.body);
432             END;
433
434             CREATE TRIGGER IF NOT EXISTS obs_fts_bd BEFORE DELETE ON observation BEGIN
435               INSERT INTO obs_fts(obs_fts, rowid, body) VALUES ('delete', old.id, '');
436             END;
437
438             CREATE TABLE IF NOT EXISTS type_dict (
439                 id     INTEGER PRIMARY KEY,
440                 kind   INTEGER NOT NULL,
441                 name   TEXT    NOT NULL,
442                 count  INTEGER NOT NULL DEFAULT 0
443             ) STRICT;
444
445             CREATE INDEX IF NOT EXISTS type_by_name
446                 ON type_dict(kind, name);
447
448             CREATE TABLE IF NOT EXISTS graph_stat (
449                 key    TEXT NOT NULL PRIMARY KEY,
450                 value  INTEGER NOT NULL
451             ) STRICT, WITHOUT ROWID;
452
453             CREATE TABLE IF NOT EXISTS hub_degree (
454                 entity_id INTEGER PRIMARY KEY,
455                 out_deg   INTEGER NOT NULL,
456                 in_deg    INTEGER NOT NULL
457             ) STRICT;
458
459             CREATE TABLE IF NOT EXISTS partition_map (
460                 table_name TEXT NOT NULL PRIMARY KEY,
461                 role       INTEGER NOT NULL,
462                 type_id    INTEGER,
463                 row_count  INTEGER NOT NULL DEFAULT 0
464             ) STRICT, WITHOUT ROWID;",
465        )
466        .map_err(sqlite_err)?;
467
468        conn.execute_batch(&format!("PRAGMA mmap_size = {};", tuning.mmap_size))
469            .map_err(sqlite_err)?;
470
471        let sync_pragma = match durability {
472            Durability::Sync => "PRAGMA synchronous = FULL",
473            Durability::Async => "PRAGMA synchronous = NORMAL",
474        };
475        conn.execute_batch(sync_pragma).map_err(sqlite_err)?;
476
477        // Bound the cost of `PRAGMA optimize` (here and in maintenance) so a
478        // large database cannot stall startup/maintenance analyzing every index.
479        conn.execute_batch("PRAGMA analysis_limit = 400;")
480            .map_err(sqlite_err)?;
481
482        let has_stat: bool = conn
483            .query_row("SELECT 1 FROM graph_stat LIMIT 1", [], |_| Ok(()))
484            .is_ok();
485        if !has_stat {
486            conn.execute_batch(
487                "INSERT INTO graph_stat(key, value) VALUES
488                 ('entities', 0), ('relations', 0), ('observations', 0),
489                 ('entity_seq', 0), ('obs_seq', 0);",
490            )
491            .map_err(sqlite_err)?;
492        }
493
494        conn.execute_batch("PRAGMA optimize;").map_err(sqlite_err)?;
495
496        let seq_entity = read_graph_stat(&conn, "entity_seq").unwrap_or(0);
497        let seq_obs = read_graph_stat(&conn, "obs_seq").unwrap_or(0);
498
499        // Open the reader pool against the now-initialized database. At least one
500        // reader is always created.
501        let pool_size = read_pool_size.max(1);
502        let mut conns = Vec::with_capacity(pool_size);
503        for _ in 0..pool_size {
504            conns.push(Mutex::new(open_reader(path, &tuning)?));
505        }
506        let readers = ReaderPool {
507            conns,
508            next: AtomicUsize::new(0),
509        };
510
511        Ok(Self {
512            writer: Mutex::new(conn),
513            readers,
514            seq_entity: AtomicI64::new(seq_entity),
515            seq_obs: AtomicI64::new(seq_obs),
516            cache: Mutex::new(LruCache::new(lru_cache_size)),
517        })
518    }
519
520    fn next_entity_id(&self) -> i64 {
521        self.seq_entity.fetch_add(1, Ordering::Relaxed) + 1
522    }
523
524    fn next_obs_id(&self) -> i64 {
525        self.seq_obs.fetch_add(1, Ordering::Relaxed) + 1
526    }
527
528    fn meta_get(&self, name: &str) -> Option<EntityMeta> {
529        self.cache.lock().get(name).copied()
530    }
531
532    fn meta_set(&self, name: &str, m: EntityMeta) {
533        self.cache.lock().put(name.to_string(), m);
534    }
535
536    fn meta_remove(&self, name: &str) {
537        self.cache.lock().pop(name);
538    }
539
540    fn meta_update(&self, name: &str, f: impl FnOnce(&mut EntityMeta)) {
541        let mut cache = self.cache.lock();
542        if let Some(m) = cache.get_mut(name) {
543            f(m);
544        }
545    }
546
547    fn get_entity_id(&self, conn: &Connection, name: &str) -> Result<Option<(i64, i64, i64, i64)>> {
548        if let Some(m) = self.meta_get(name) {
549            return Ok(Some((m.id, m.type_id, m.out_deg, m.in_deg)));
550        }
551        let h = name_hash(name);
552        let mut stmt = conn
553            .prepare_cached(
554                "SELECT id, type_id, obs_count, out_deg, in_deg
555                 FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
556            )
557            .map_err(sqlite_err)?;
558        match stmt.query_row(params![h, name], |row| {
559            Ok(EntityMeta {
560                id: row.get(0)?,
561                type_id: row.get(1)?,
562                obs_count: row.get(2)?,
563                out_deg: row.get(3)?,
564                in_deg: row.get(4)?,
565            })
566        }) {
567            Ok(m) => {
568                self.meta_set(name, m);
569                Ok(Some((m.id, m.type_id, m.out_deg, m.in_deg)))
570            }
571            Err(e) if is_not_found(&e) => Ok(None),
572            Err(e) => Err(sqlite_err(e)),
573        }
574    }
575
576    fn sync_seqs(&self, conn: &Connection) -> Result<()> {
577        let seq_e = self.seq_entity.load(Ordering::Relaxed);
578        let seq_o = self.seq_obs.load(Ordering::Relaxed);
579        conn.execute(
580            "UPDATE graph_stat SET value = CASE key WHEN 'entity_seq' THEN ?1 WHEN 'obs_seq' THEN ?2 ELSE value END
581             WHERE key IN ('entity_seq', 'obs_seq')",
582            params![seq_e, seq_o],
583        )
584        .map_err(sqlite_err)?;
585        Ok(())
586    }
587
588    // ── Public API ──────────────────────────────────────────────────────
589
590    pub fn get_entity(&self, name: &str) -> Result<Option<Entity>> {
591        if name.is_empty() {
592            return Ok(None);
593        }
594
595        if let Some(m) = self.meta_get(name) {
596            let conn = self.readers.get();
597            let etype = name_of_type(&conn, m.type_id).unwrap_or_default();
598            let observations = load_observations_opt(&conn, m.id);
599            return Ok(Some(Entity {
600                name: name.to_string(),
601                entity_type: etype,
602                observations,
603            }));
604        }
605
606        let conn = self.readers.get();
607        let h = name_hash(name);
608        let mut stmt = conn
609            .prepare_cached(
610                "SELECT e.id, e.type_id, e.name, t.name,
611                        e.obs_count, e.out_deg, e.in_deg
612                 FROM entity e
613                 JOIN type_dict t ON t.id = e.type_id
614                 WHERE e.name_hash = ?1 AND e.name = ?2 AND e.flags = 0",
615            )
616            .map_err(sqlite_err)?;
617        match stmt.query_row(params![h, name], |row| {
618            let id: i64 = row.get(0)?;
619            let type_id: i64 = row.get(1)?;
620            let ename: String = row.get(2)?;
621            let etype: String = row.get(3)?;
622            let obs_count: i64 = row.get(4)?;
623            let out_deg: i64 = row.get(5)?;
624            let in_deg: i64 = row.get(6)?;
625            Ok((id, type_id, ename, etype, obs_count, out_deg, in_deg))
626        }) {
627            Ok((id, type_id, ename, etype, obs_count, out_deg, in_deg)) => {
628                let observations = load_observations_opt(&conn, id);
629                drop(stmt);
630                drop(conn);
631                self.meta_set(&ename, EntityMeta { id, type_id, obs_count, out_deg, in_deg });
632                Ok(Some(Entity {
633                    name: ename,
634                    entity_type: etype,
635                    observations,
636                }))
637            }
638            Err(e) if is_not_found(&e) => Ok(None),
639            Err(e) => Err(sqlite_err(e)),
640        }
641    }
642
643    pub fn create_entities(&self, entities: &[Entity]) -> Result<Vec<Entity>> {
644        let conn = self.writer.lock();
645        let tx = TxGuard::begin(&conn)?;
646
647        let mut ins_ent = conn
648            .prepare_cached(
649                "INSERT INTO entity (id, name_hash, name, type_id, obs_count, out_deg, in_deg, created_us, updated_us, flags)
650                 SELECT ?1, ?2, ?3, ?4, ?5, 0, 0, ?6, ?6, 0
651                 WHERE NOT EXISTS (SELECT 1 FROM entity WHERE name_hash = ?2 AND name = ?3 AND flags = 0)",
652            )
653            .map_err(sqlite_err)?;
654
655        let mut ins_fts = conn
656            .prepare_cached("INSERT INTO name_fts (rowid, name) VALUES (?1, ?2)")
657            .map_err(sqlite_err)?;
658
659        let batch_ts = now_us();
660        let mut type_cache: FxHashMap<String, i64> = FxHashMap::default();
661        let mut type_deltas: FxHashMap<i64, i64> = FxHashMap::default();
662        let mut total_entities: i64 = 0;
663        let mut total_obs: i64 = 0;
664        let mut created = Vec::new();
665        let mut created_metas: Vec<(String, EntityMeta)> = Vec::new();
666        let mut obs_sql = String::new();
667
668        for entity in entities {
669            if entity.name.is_empty() {
670                continue;
671            }
672            let h = name_hash(&entity.name);
673            let id = self.next_entity_id();
674            let type_id = match type_cache.get(entity.entity_type.as_str()) {
675                Some(t) => *t,
676                None => {
677                    let t = get_type_id(&conn, &entity.entity_type, 0)?;
678                    type_cache.insert(entity.entity_type.clone(), t);
679                    t
680                }
681            };
682            let obs_count = entity.observations.len() as i64;
683
684            let changed = ins_ent
685                .execute(params![id, h, entity.name, type_id, obs_count, batch_ts])
686                .map_err(sqlite_err)?;
687            if changed == 0 {
688                continue;
689            }
690
691            let n = entity.observations.len();
692            if n > 0 {
693                obs_sql.clear();
694
695                let mut oids = Vec::with_capacity(n);
696                let mut idxs = Vec::with_capacity(n);
697                for _ in 0..n {
698                    oids.push(self.next_obs_id());
699                }
700                for i in 0..n as i64 {
701                    idxs.push(i);
702                }
703
704                obs_sql.push_str("INSERT INTO observation (id,entity_id,idx,body,created_us) VALUES");
705                for i in 0..n {
706                    if i > 0 { obs_sql.push(','); }
707                    obs_sql.push_str("(?,?,?,?,?)");
708                }
709
710                let mut obs_params: Vec<&dyn ToSql> = Vec::with_capacity(n * 5);
711                for (i, obs) in entity.observations.iter().enumerate() {
712                    obs_params.push(&oids[i]);
713                    obs_params.push(&id);
714                    obs_params.push(&idxs[i]);
715                    obs_params.push(obs);
716                    obs_params.push(&batch_ts);
717                }
718
719                conn.execute(&obs_sql, obs_params.as_slice())
720                    .map_err(sqlite_err)?;
721            }
722
723            ins_fts
724                .execute(params![id, entity.name])
725                .map_err(sqlite_err)?;
726
727            *type_deltas.entry(type_id).or_insert(0) += 1;
728            total_entities += 1;
729            total_obs += obs_count;
730
731            created.push(entity.clone());
732            created_metas.push((entity.name.clone(), EntityMeta {
733                id,
734                type_id,
735                obs_count,
736                out_deg: 0,
737                in_deg: 0,
738            }));
739        }
740
741        if total_entities > 0 {
742            for (type_id, delta) in &type_deltas {
743                inc_type_count(&conn, *type_id, *delta)?;
744            }
745            inc_graph_stat(&conn, "entities", total_entities)?;
746            inc_graph_stat(&conn, "observations", total_obs)?;
747            self.sync_seqs(&conn)?;
748        }
749
750        tx.commit()?;
751
752        // Note: `PRAGMA optimize` is intentionally *not* run here. It analyzes
753        // indexes and writes to internal stat tables — pure overhead on the
754        // write hot path. The periodic `run_maintenance` task covers it.
755
756        if !created_metas.is_empty() {
757            let mut cache = self.cache.lock();
758            for (name, meta) in &created_metas {
759                cache.put(name.clone(), *meta);
760            }
761        }
762
763        Ok(created)
764    }
765
766    pub fn delete_entities(&self, names: &[String]) -> Result<()> {
767        if names.is_empty() {
768            return Ok(());
769        }
770        let conn = self.writer.lock();
771
772        // Phase 1: Resolve all names to (id, type_id).
773        let mut resolved: Vec<(i64, i64, String)> = Vec::with_capacity(names.len());
774        let mut sel = conn
775            .prepare_cached(
776                "SELECT id, type_id FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
777            )
778            .map_err(sqlite_err)?;
779        for name in names {
780            let h = name_hash(name);
781            let (id, type_id) = match sel.query_row(params![h, name], |row| {
782                Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?))
783            }) {
784                Ok(v) => v,
785                Err(e) if is_not_found(&e) => continue,
786                Err(e) => return Err(sqlite_err(e)),
787            };
788            resolved.push((id, type_id, name.clone()));
789        }
790
791        if resolved.is_empty() {
792            return Ok(());
793        }
794
795        let ids: Vec<i64> = resolved.iter().map(|(id, _, _)| *id).collect();
796        let n = ids.len();
797
798        // Phase 2: Batch DELETE observations.
799        let obs_p: Vec<String> = (0..n).map(|i| format!("?{}", i + 1)).collect();
800        let obs_sql = format!(
801            "DELETE FROM observation WHERE entity_id IN ({})",
802            obs_p.join(",")
803        );
804        let obs_refs: Vec<&dyn ToSql> = ids.iter().map(|id| id as &dyn ToSql).collect();
805        let obs_deleted = conn
806            .execute(&obs_sql, obs_refs.as_slice())
807            .map_err(sqlite_err)? as i64;
808
809        // Phase 3: Batch DELETE relations.
810        let rel_sql = format!(
811            "DELETE FROM relation WHERE from_id IN ({}) OR to_id IN ({})",
812            obs_p.join(","),
813            obs_p.join(",")
814        );
815        let rel_refs: Vec<&dyn ToSql> = ids.iter().map(|id| id as &dyn ToSql).collect();
816        let rel_deleted = conn
817            .execute(&rel_sql, rel_refs.as_slice())
818            .map_err(sqlite_err)? as i64;
819
820        // Phase 4: Batch FTS deletes.
821        let fts_values: Vec<String> = (0..n)
822            .map(|_| "('delete', ?, '')".to_string())
823            .collect();
824        let fts_sql = format!(
825            "INSERT INTO name_fts(name_fts, rowid, name) VALUES {}",
826            fts_values.join(", ")
827        );
828        conn.execute(&fts_sql, rusqlite::params_from_iter(&ids))
829            .map_err(sqlite_err)?;
830
831        // Aggregate type count deltas.
832        let mut type_deltas: FxHashMap<i64, i64> = FxHashMap::default();
833        for &(_, type_id, _) in &resolved {
834            *type_deltas.entry(type_id).or_insert(0) += 1;
835        }
836
837        // Phase 5: Batch type count decrements.
838        if !type_deltas.is_empty() {
839            let m = type_deltas.len();
840            let type_keys: Vec<i64> = type_deltas.keys().cloned().collect();
841            let type_vals: Vec<i64> = type_deltas.values().map(|v| -*v).collect();
842            let mut case_parts: Vec<String> = Vec::with_capacity(m);
843            let mut id_parts: Vec<String> = Vec::with_capacity(m);
844            for i in 0..m {
845                case_parts.push(format!("WHEN ?{} THEN ?{}", i + 1, m + i + 1));
846                id_parts.push(format!("?{}", i + 1));
847            }
848            let sql = format!(
849                "UPDATE type_dict SET count = MAX(0, count + CASE id {} ELSE 0 END) WHERE id IN ({})",
850                case_parts.join(" "),
851                id_parts.join(","),
852            );
853            let mut params: Vec<Box<dyn ToSql>> = Vec::with_capacity(2 * m);
854            for id in &type_keys {
855                params.push(Box::new(*id));
856            }
857            for delta in &type_vals {
858                params.push(Box::new(*delta));
859            }
860            let param_refs: Vec<&dyn ToSql> = params.iter().map(|p| p.as_ref()).collect();
861            conn.execute(&sql, param_refs.as_slice()).map_err(sqlite_err)?;
862        }
863
864        // Phase 6: Batch DELETE entities.
865        conn.execute(
866            &format!("DELETE FROM entity WHERE id IN ({})", obs_p.join(",")),
867            ids.iter().map(|id| id as &dyn ToSql).collect::<Vec<_>>().as_slice(),
868        )
869        .map_err(sqlite_err)?;
870
871        // Phase 7: Update stats.
872        inc_graph_stat(&conn, "entities", -(n as i64))?;
873        inc_graph_stat(&conn, "observations", -obs_deleted)?;
874        inc_graph_stat(&conn, "relations", -rel_deleted)?;
875
876        // Phase 8: Remove from cache.
877        for (_, _, name) in &resolved {
878            self.meta_remove(name);
879        }
880
881        Ok(())
882    }
883
884    pub fn create_relations(&self, relations: &[Relation]) -> Result<Vec<Relation>> {
885        let conn = self.writer.lock();
886        let tx = TxGuard::begin(&conn)?;
887
888        let mut ins = conn
889            .prepare_cached(
890                "INSERT INTO relation (from_id, to_id, type_id, created_us)
891                 SELECT ?1, ?2, ?3, ?4
892                 WHERE NOT EXISTS (SELECT 1 FROM relation WHERE from_id = ?1 AND to_id = ?2 AND type_id = ?3)",
893            )
894            .map_err(sqlite_err)?;
895
896        let ts = now_us();
897        let mut type_cache: FxHashMap<String, i64> = FxHashMap::default();
898        let mut type_deltas: FxHashMap<i64, i64> = FxHashMap::default();
899        let mut out_deltas: FxHashMap<i64, i64> = FxHashMap::default();
900        let mut in_deltas: FxHashMap<i64, i64> = FxHashMap::default();
901        let mut total_relations: i64 = 0;
902        let mut created = Vec::new();
903
904        for rel in relations {
905            let (from_id, _, _, _) = match self.get_entity_id(&conn, &rel.from)? {
906                Some(v) => v,
907                None => continue,
908            };
909            let (to_id, _, _, _) = match self.get_entity_id(&conn, &rel.to)? {
910                Some(v) => v,
911                None => continue,
912            };
913            let type_id = match type_cache.get(rel.relation_type.as_str()) {
914                Some(t) => *t,
915                None => {
916                    let t = get_type_id(&conn, &rel.relation_type, 1)?;
917                    type_cache.insert(rel.relation_type.clone(), t);
918                    t
919                }
920            };
921
922            let changed = ins
923                .execute(params![from_id, to_id, type_id, ts])
924                .map_err(sqlite_err)?;
925            if changed == 0 {
926                continue;
927            }
928
929            *out_deltas.entry(from_id).or_insert(0) += 1;
930            *in_deltas.entry(to_id).or_insert(0) += 1;
931            *type_deltas.entry(type_id).or_insert(0) += 1;
932            total_relations += 1;
933
934            created.push(rel.clone());
935        }
936
937        if total_relations > 0 {
938            for (id, delta) in &out_deltas {
939                conn.execute(
940                    "UPDATE entity SET out_deg = out_deg + ?1 WHERE id = ?2",
941                    params![delta, id],
942                )
943                .map_err(sqlite_err)?;
944            }
945            for (id, delta) in in_deltas {
946                conn.execute(
947                    "UPDATE entity SET in_deg = in_deg + ?1 WHERE id = ?2",
948                    params![delta, id],
949                )
950                .map_err(sqlite_err)?;
951            }
952            for (type_id, delta) in &type_deltas {
953                inc_type_count(&conn, *type_id, *delta)?;
954            }
955            inc_graph_stat(&conn, "relations", total_relations)?;
956        }
957
958        tx.commit()?;
959
960        // See `create_entities`: `PRAGMA optimize` is deferred to maintenance.
961
962        if !created.is_empty() {
963            let mut cache = self.cache.lock();
964            for rel in &created {
965                if let Some(m) = cache.get_mut(&rel.from) {
966                    m.out_deg += 1;
967                }
968                if let Some(m) = cache.get_mut(&rel.to) {
969                    m.in_deg += 1;
970                }
971            }
972        }
973
974        Ok(created)
975    }
976
977    pub fn delete_relations(&self, relations: &[Relation]) -> Result<()> {
978        if relations.is_empty() {
979            return Ok(());
980        }
981        let conn = self.writer.lock();
982
983        // Resolve names to IDs and collect valid triples.
984        let mut triples: Vec<(i64, i64, i64)> = Vec::with_capacity(relations.len());
985        let mut names: Vec<(String, String)> = Vec::with_capacity(relations.len());
986        for rel in relations {
987            let (from_id, _, _, _) = match self.get_entity_id(&conn, &rel.from)? {
988                Some(v) => v,
989                None => continue,
990            };
991            let (to_id, _, _, _) = match self.get_entity_id(&conn, &rel.to)? {
992                Some(v) => v,
993                None => continue,
994            };
995            let type_id = match get_type_id(&conn, &rel.relation_type, 1) {
996                Ok(id) => id,
997                Err(_) => continue,
998            };
999            triples.push((from_id, to_id, type_id));
1000            names.push((rel.from.clone(), rel.to.clone()));
1001        }
1002
1003        if triples.is_empty() {
1004            return Ok(());
1005        }
1006
1007        // Batch DELETE using VALUES subquery.
1008        let mut sql = String::from(
1009            "DELETE FROM relation WHERE (from_id, to_id, type_id) IN (",
1010        );
1011        for (i, _) in triples.iter().enumerate() {
1012            if i > 0 {
1013                sql.push_str(", ");
1014            }
1015            let base = (i * 3) + 1;
1016            sql.push_str(&format!("SELECT ?{b}, ?{bp1}, ?{bp2}", b = base, bp1 = base + 1, bp2 = base + 2));
1017        }
1018        sql.push(')');
1019
1020        let mut param_values: Vec<Box<dyn ToSql>> = Vec::with_capacity(triples.len() * 3);
1021        for &(f, t, tp) in &triples {
1022            param_values.push(Box::new(f));
1023            param_values.push(Box::new(t));
1024            param_values.push(Box::new(tp));
1025        }
1026        let param_refs: Vec<&dyn ToSql> = param_values.iter().map(|p| p.as_ref()).collect();
1027        let total = conn.execute(&sql, param_refs.as_slice()).map_err(sqlite_err)?;
1028        if total == 0 {
1029            return Ok(());
1030        }
1031
1032        // Aggregate degree and type deltas.
1033        let mut out_deltas: FxHashMap<i64, i64> = FxHashMap::default();
1034        let mut in_deltas: FxHashMap<i64, i64> = FxHashMap::default();
1035        let mut type_deltas: FxHashMap<i64, i64> = FxHashMap::default();
1036        for &(from_id, to_id, type_id) in &triples {
1037            *out_deltas.entry(from_id).or_insert(0) += 1;
1038            *in_deltas.entry(to_id).or_insert(0) += 1;
1039            *type_deltas.entry(type_id).or_insert(0) += 1;
1040        }
1041
1042        // Batch out_deg updates.
1043        let out_keys: Vec<i64> = out_deltas.keys().cloned().collect();
1044        let out_vals: Vec<i64> = out_deltas.values().cloned().collect();
1045        if !out_keys.is_empty() {
1046            let m = out_keys.len();
1047            let mut case_parts: Vec<String> = Vec::with_capacity(m);
1048            let mut id_parts: Vec<String> = Vec::with_capacity(m);
1049            for i in 0..m {
1050                case_parts.push(format!("WHEN ?{} THEN ?{}", i + 1, m + i + 1));
1051                id_parts.push(format!("?{}", i + 1));
1052            }
1053            let sql = format!(
1054                "UPDATE entity SET out_deg = MAX(0, out_deg - CASE id {} ELSE 0 END) WHERE id IN ({})",
1055                case_parts.join(" "),
1056                id_parts.join(","),
1057            );
1058            let mut params: Vec<Box<dyn ToSql>> = Vec::with_capacity(2 * m);
1059            for id in &out_keys {
1060                params.push(Box::new(*id));
1061            }
1062            for delta in &out_vals {
1063                params.push(Box::new(*delta));
1064            }
1065            let param_refs: Vec<&dyn ToSql> = params.iter().map(|p| p.as_ref()).collect();
1066            conn.execute(&sql, param_refs.as_slice()).map_err(sqlite_err)?;
1067        }
1068
1069        // Batch in_deg updates.
1070        let in_keys: Vec<i64> = in_deltas.keys().cloned().collect();
1071        let in_vals: Vec<i64> = in_deltas.values().cloned().collect();
1072        if !in_keys.is_empty() {
1073            let m = in_keys.len();
1074            let mut case_parts: Vec<String> = Vec::with_capacity(m);
1075            let mut id_parts: Vec<String> = Vec::with_capacity(m);
1076            for i in 0..m {
1077                case_parts.push(format!("WHEN ?{} THEN ?{}", i + 1, m + i + 1));
1078                id_parts.push(format!("?{}", i + 1));
1079            }
1080            let sql = format!(
1081                "UPDATE entity SET in_deg = MAX(0, in_deg - CASE id {} ELSE 0 END) WHERE id IN ({})",
1082                case_parts.join(" "),
1083                id_parts.join(","),
1084            );
1085            let mut params: Vec<Box<dyn ToSql>> = Vec::with_capacity(2 * m);
1086            for id in &in_keys {
1087                params.push(Box::new(*id));
1088            }
1089            for delta in &in_vals {
1090                params.push(Box::new(*delta));
1091            }
1092            let param_refs: Vec<&dyn ToSql> = params.iter().map(|p| p.as_ref()).collect();
1093            conn.execute(&sql, param_refs.as_slice()).map_err(sqlite_err)?;
1094        }
1095
1096        // Batch type_dict updates.
1097        let type_keys: Vec<i64> = type_deltas.keys().cloned().collect();
1098        let type_vals: Vec<i64> = type_deltas.values().cloned().collect();
1099        if !type_keys.is_empty() {
1100            let m = type_keys.len();
1101            let mut case_parts: Vec<String> = Vec::with_capacity(m);
1102            let mut id_parts: Vec<String> = Vec::with_capacity(m);
1103            for i in 0..m {
1104                case_parts.push(format!("WHEN ?{} THEN ?{}", i + 1, m + i + 1));
1105                id_parts.push(format!("?{}", i + 1));
1106            }
1107            let sql = format!(
1108                "UPDATE type_dict SET count = MAX(0, count - CASE id {} ELSE 0 END) WHERE id IN ({})",
1109                case_parts.join(" "),
1110                id_parts.join(","),
1111            );
1112            let mut params: Vec<Box<dyn ToSql>> = Vec::with_capacity(2 * m);
1113            for id in &type_keys {
1114                params.push(Box::new(*id));
1115            }
1116            for delta in &type_vals {
1117                params.push(Box::new(*delta));
1118            }
1119            let param_refs: Vec<&dyn ToSql> = params.iter().map(|p| p.as_ref()).collect();
1120            conn.execute(&sql, param_refs.as_slice()).map_err(sqlite_err)?;
1121        }
1122
1123        inc_graph_stat(&conn, "relations", -(total as i64))?;
1124
1125        // Update cache for resolved triples (self-heals on next reload if
1126        // a triple happened to not match).
1127        for (from, to) in &names {
1128            self.meta_update(from, |m| m.out_deg = m.out_deg.saturating_sub(1));
1129            self.meta_update(to, |m| m.in_deg = m.in_deg.saturating_sub(1));
1130        }
1131
1132        Ok(())
1133    }
1134
1135    pub fn add_observations(&self, entity_name: &str, contents: &[String]) -> Result<Vec<String>> {
1136        let conn = self.writer.lock();
1137        let (id, _type_id, _, _) = match self.get_entity_id(&conn, entity_name)? {
1138            Some(v) => v,
1139            None => {
1140                return Err(MCSError::InvalidParams(format!(
1141                    "Entity '{entity_name}' not found"
1142                )))
1143            }
1144        };
1145
1146        let mut max_idx: i64 = conn
1147            .query_row(
1148                "SELECT COALESCE(MAX(idx), -1) FROM observation WHERE entity_id = ?1",
1149                params![id],
1150                |row| row.get(0),
1151            )
1152            .map_err(sqlite_err)?;
1153
1154        let ts = now_us();
1155        let mut ins_obs = conn
1156            .prepare_cached(
1157                "INSERT INTO observation (id, entity_id, idx, body, created_us) VALUES (?1, ?2, ?3, ?4, ?5)",
1158            )
1159            .map_err(sqlite_err)?;
1160
1161        for content in contents {
1162            max_idx += 1;
1163            let oid = self.next_obs_id();
1164            ins_obs
1165                .execute(params![oid, id, max_idx, content, ts])
1166                .map_err(sqlite_err)?;
1167        }
1168        let added = contents.to_vec();
1169
1170        let count: i64 = contents.len() as i64;
1171        conn.execute(
1172            "UPDATE entity SET obs_count = obs_count + ?1, updated_us = ?2 WHERE id = ?3",
1173            params![count, ts, id],
1174        )
1175        .map_err(sqlite_err)?;
1176
1177        inc_graph_stat(&conn, "observations", count)?;
1178        self.sync_seqs(&conn)?;
1179
1180        self.meta_update(entity_name, |m| m.obs_count += count);
1181
1182        Ok(added)
1183    }
1184
1185    pub fn delete_observations(&self, entity_name: &str, observations: &[String]) -> Result<()> {
1186        if observations.is_empty() {
1187            return Ok(());
1188        }
1189        let conn = self.writer.lock();
1190        let (id, _, _, _) = match self.get_entity_id(&conn, entity_name)? {
1191            Some(v) => v,
1192            None => {
1193                return Err(MCSError::InvalidParams(format!(
1194                    "Entity '{entity_name}' not found"
1195                )))
1196            }
1197        };
1198
1199        let placeholders: Vec<String> = (0..observations.len())
1200            .map(|i| format!("?{}", i + 2))
1201            .collect();
1202        let sql = format!(
1203            "DELETE FROM observation WHERE entity_id = ?1 AND body IN ({})",
1204            placeholders.join(",")
1205        );
1206
1207        let mut param_values: Vec<Box<dyn ToSql>> = Vec::with_capacity(1 + observations.len());
1208        param_values.push(Box::new(id));
1209        for obs in observations {
1210            param_values.push(Box::new(obs.as_str()));
1211        }
1212        let param_refs: Vec<&dyn ToSql> = param_values.iter().map(|p| p.as_ref()).collect();
1213        let removed = conn.execute(&sql, param_refs.as_slice()).map_err(sqlite_err)? as i64;
1214
1215        if removed > 0 {
1216            conn.execute(
1217                "UPDATE entity SET obs_count = MAX(0, obs_count - ?1), updated_us = ?2 WHERE id = ?3",
1218                params![removed, now_us(), id],
1219            )
1220            .map_err(sqlite_err)?;
1221            inc_graph_stat(&conn, "observations", -removed)?;
1222
1223            self.meta_update(entity_name, |m| m.obs_count = m.obs_count.saturating_sub(removed));
1224        }
1225
1226        Ok(())
1227    }
1228
1229    pub fn upsert_entities(&self, entities: &[Entity]) -> Result<Vec<Entity>> {
1230        let mut results = Vec::new();
1231        for entity in entities {
1232            if let Some(existing) = self.get_entity(&entity.name)? {
1233                // Update type if different.
1234                if existing.entity_type != entity.entity_type {
1235                    let conn = self.writer.lock();
1236                    let old_type_id = conn
1237                        .query_row(
1238                            "SELECT type_id FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
1239                            params![name_hash(&entity.name), entity.name],
1240                            |row| row.get::<_, i64>(0),
1241                        )
1242                        .map_err(sqlite_err)?;
1243                    let new_type_id = get_type_id(&conn, &entity.entity_type, 0)?;
1244                    inc_type_count(&conn, old_type_id, -1)?;
1245                    inc_type_count(&conn, new_type_id, 1)?;
1246                    conn.execute(
1247                        "UPDATE entity SET type_id = ?1, updated_us = ?2 WHERE name_hash = ?3 AND name = ?4",
1248                        params![new_type_id, now_us(), name_hash(&entity.name), entity.name],
1249                    )
1250                    .map_err(sqlite_err)?;
1251                    // Invalidate cache so subsequent get_entity reloads meta.
1252                    self.meta_remove(&entity.name);
1253                }
1254                // Merge observations (append new ones not already present).
1255                let existing_set: HashSet<&str> =
1256                    existing.observations.iter().map(|s| s.as_str()).collect();
1257                let to_add: Vec<String> = entity
1258                    .observations
1259                    .iter()
1260                    .filter(|o| !existing_set.contains(o.as_str()))
1261                    .cloned()
1262                    .collect();
1263                if !to_add.is_empty() {
1264                    self.add_observations(&entity.name, &to_add)?;
1265                }
1266                let updated = self
1267                    .get_entity(&entity.name)?
1268                    .unwrap_or(entity.clone());
1269                results.push(updated);
1270            } else {
1271                let c = self.create_entities(std::slice::from_ref(entity))?;
1272                if let Some(e) = c.into_iter().next() {
1273                    results.push(e);
1274                }
1275            }
1276        }
1277        Ok(results)
1278    }
1279
1280    pub fn merge_entities(&self, source: &str, target: &str) -> Result<Entity> {
1281        let conn = self.writer.lock();
1282        let (src_id, _, _, _) = match self.get_entity_id(&conn, source)? {
1283            Some(v) => v,
1284            None => {
1285                return Err(MCSError::InvalidParams(format!(
1286                    "Source entity '{source}' not found"
1287                )))
1288            }
1289        };
1290        let (tgt_id, _, _, _) = match self.get_entity_id(&conn, target)? {
1291            Some(v) => v,
1292            None => {
1293                return Err(MCSError::InvalidParams(format!(
1294                    "Target entity '{target}' not found"
1295                )))
1296            }
1297        };
1298
1299        if src_id == tgt_id {
1300            return self.get_entity(target)?.ok_or_else(|| {
1301                MCSError::InvalidParams("Target entity not found after merge".into())
1302            });
1303        }
1304
1305        // Move observations from source to target.
1306        let mut obs_count: i64 = 0;
1307        {
1308            let mut max_idx: i64 = conn
1309                .query_row(
1310                    "SELECT COALESCE(MAX(idx), -1) FROM observation WHERE entity_id = ?1",
1311                    params![tgt_id],
1312                    |row| row.get(0),
1313                )
1314                .map_err(sqlite_err)?;
1315            let mut sel_obs = conn
1316                .prepare_cached(
1317                    "SELECT id, body FROM observation WHERE entity_id = ?1 ORDER BY idx",
1318                )
1319                .map_err(sqlite_err)?;
1320            let mut upd_obs = conn
1321                .prepare_cached("UPDATE observation SET entity_id = ?1, idx = ?2 WHERE id = ?3")
1322                .map_err(sqlite_err)?;
1323            let rows: Vec<(i64, String)> = sel_obs
1324                .query_map(params![src_id], |row| {
1325                    Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?))
1326                })
1327                .map_err(sqlite_err)?
1328                .filter_map(|r| r.ok())
1329                .collect();
1330            for (oid, _body) in &rows {
1331                max_idx += 1;
1332                upd_obs
1333                    .execute(params![tgt_id, max_idx, oid])
1334                    .map_err(sqlite_err)?;
1335                obs_count += 1;
1336            }
1337        }
1338
1339        // Move relations from source to target.
1340        conn.execute(
1341            "UPDATE OR IGNORE relation SET from_id = ?1 WHERE from_id = ?2",
1342            params![tgt_id, src_id],
1343        )
1344        .map_err(sqlite_err)?;
1345        conn.execute(
1346            "UPDATE OR IGNORE relation SET to_id = ?1 WHERE to_id = ?2",
1347            params![tgt_id, src_id],
1348        )
1349        .map_err(sqlite_err)?;
1350        // Delete orphaned relations that were updated by the above (the "OR IGNORE"
1351        // keeps the first, but we still have the original row with the old id? No —
1352        // UPDATE OR IGNORE won't remove. So we must delete any that still reference src_id.)
1353        conn.execute("DELETE FROM relation WHERE from_id = ?1", params![src_id])
1354            .map_err(sqlite_err)?;
1355        conn.execute("DELETE FROM relation WHERE to_id = ?1", params![src_id])
1356            .map_err(sqlite_err)?;
1357
1358        // Update degrees on target.
1359        let out_add: i64 = conn
1360            .query_row(
1361                "SELECT COUNT(*) FROM relation WHERE from_id = ?1",
1362                params![tgt_id],
1363                |row| row.get(0),
1364            )
1365            .map_err(sqlite_err)?;
1366        let in_add: i64 = conn
1367            .query_row(
1368                "SELECT COUNT(*) FROM relation WHERE to_id = ?1",
1369                params![tgt_id],
1370                |row| row.get(0),
1371            )
1372            .map_err(sqlite_err)?;
1373        conn.execute(
1374            "UPDATE entity SET out_deg = ?1, in_deg = ?2, obs_count = obs_count + ?3, updated_us = ?4 WHERE id = ?5",
1375            params![out_add, in_add, obs_count, now_us(), tgt_id],
1376        )
1377        .map_err(sqlite_err)?;
1378
1379        // Delete source entity.
1380        conn.execute(
1381            "INSERT INTO name_fts(name_fts, rowid, name) VALUES ('delete', ?1, '')",
1382            params![src_id],
1383        )
1384        .map_err(sqlite_err)?;
1385        conn.execute("DELETE FROM entity WHERE id = ?1", params![src_id])
1386            .map_err(sqlite_err)?;
1387
1388        inc_graph_stat(&conn, "entities", -1)?;
1389        self.meta_remove(source);
1390
1391        // Reload target into cache.
1392        if let Ok(meta) = conn.query_row(
1393            "SELECT id, type_id, obs_count, out_deg, in_deg FROM entity WHERE id = ?1",
1394            params![tgt_id],
1395            |row| {
1396                Ok(EntityMeta {
1397                    id: row.get(0)?,
1398                    type_id: row.get(1)?,
1399                    obs_count: row.get(2)?,
1400                    out_deg: row.get(3)?,
1401                    in_deg: row.get(4)?,
1402                })
1403            },
1404        ) {
1405            self.meta_set(target, meta);
1406        }
1407
1408        let (name, etype): (String, String) = conn
1409            .query_row(
1410                "SELECT e.name, t.name FROM entity e JOIN type_dict t ON t.id = e.type_id WHERE e.id = ?1",
1411                params![tgt_id],
1412                |row| Ok((row.get(0)?, row.get(1)?)),
1413            )
1414            .map_err(sqlite_err)?;
1415        let observations = load_observations_opt(&conn, tgt_id);
1416
1417        Ok(Entity {
1418            name,
1419            entity_type: etype,
1420            observations,
1421        })
1422    }
1423
1424    pub fn search_nodes_filtered(
1425        &self,
1426        query: &str,
1427        filter_type: Option<&str>,
1428        offset: usize,
1429        limit: usize,
1430    ) -> Vec<Entity> {
1431        if query.is_empty() {
1432            return Vec::new();
1433        }
1434        let conn = self.readers.get();
1435
1436        // Single pass: collect IDs from name_fts then obs_fts, deduping with a set.
1437        let mut entity_ids: Vec<i64> = Vec::new();
1438        let mut seen: HashSet<i64> = HashSet::new();
1439
1440        if let Ok(mut stmt) = conn.prepare(
1441            "SELECT rowid FROM name_fts WHERE name_fts MATCH ?1 ORDER BY rank LIMIT ?2",
1442        ) {
1443            let limit_i64 = (limit + offset) as i64;
1444            if let Ok(rows) = stmt.query_map(params![query, limit_i64], |row| {
1445                row.get::<_, i64>(0)
1446            }) {
1447                for row in rows.flatten() {
1448                    if seen.insert(row) {
1449                        entity_ids.push(row);
1450                    }
1451                }
1452            }
1453        }
1454
1455        if let Ok(mut stmt) = conn.prepare(
1456            "SELECT entity_id FROM obs_fts JOIN observation ON obs_fts.rowid = observation.id
1457             WHERE obs_fts MATCH ?1
1458             GROUP BY entity_id
1459             LIMIT ?2",
1460        ) {
1461            let limit_i64 = (limit + offset) as i64;
1462            if let Ok(rows) = stmt.query_map(params![query, limit_i64], |row| {
1463                row.get::<_, i64>(0)
1464            }) {
1465                for row in rows.flatten() {
1466                    if seen.insert(row) {
1467                        entity_ids.push(row);
1468                    }
1469                }
1470            }
1471        }
1472
1473        // Apply filter_type, offset, limit.
1474        let mut results = Vec::new();
1475        let mut count: usize = 0;
1476        for eid in entity_ids {
1477            if let Ok(entity) = entity_by_id(&conn, eid) {
1478                if let Some(ft) = filter_type
1479                    && !ft.is_empty() && entity.entity_type != ft {
1480                        continue;
1481                    }
1482                if count < offset {
1483                    count += 1;
1484                    continue;
1485                }
1486                if results.len() >= limit {
1487                    break;
1488                }
1489                results.push(entity);
1490                count += 1;
1491            }
1492        }
1493
1494        results
1495    }
1496
1497    pub fn read_graph_filtered(
1498        &self,
1499        filter_type: Option<&str>,
1500        offset: usize,
1501        limit: usize,
1502    ) -> Result<String> {
1503        let conn = self.readers.get();
1504
1505        let limit_sql: i64 = if limit == usize::MAX {
1506            -1
1507        } else {
1508            limit.min(i64::MAX as usize) as i64
1509        };
1510        let offset_sql: i64 = offset as i64;
1511
1512        // Resolve the requested page of entity ids first. Relations are then
1513        // scoped to edges whose *both* endpoints fall inside this page, which
1514        // keeps the response self-consistent (no dangling references to
1515        // entities that were paged out) and bounds the relation payload by the
1516        // page size instead of dumping every relation in the graph.
1517        let filter = filter_type.filter(|ft| !ft.is_empty());
1518        let ids: Vec<i64> = if let Some(ft) = filter {
1519            let mut stmt = conn
1520                .prepare_cached(
1521                    "SELECT e.id FROM entity e
1522                     WHERE e.type_id = (SELECT id FROM type_dict WHERE kind = 0 AND name = ?1)
1523                       AND e.flags = 0
1524                     ORDER BY e.id LIMIT ?2 OFFSET ?3",
1525                )
1526                .map_err(sqlite_err)?;
1527            stmt.query_map(params![ft, limit_sql, offset_sql], |r| r.get::<_, i64>(0))
1528                .map_err(sqlite_err)?
1529                .filter_map(|r| r.ok())
1530                .collect()
1531        } else {
1532            let mut stmt = conn
1533                .prepare_cached(
1534                    "SELECT e.id FROM entity e WHERE e.flags = 0
1535                     ORDER BY e.id LIMIT ?1 OFFSET ?2",
1536                )
1537                .map_err(sqlite_err)?;
1538            stmt.query_map(params![limit_sql, offset_sql], |r| r.get::<_, i64>(0))
1539                .map_err(sqlite_err)?
1540                .filter_map(|r| r.ok())
1541                .collect()
1542        };
1543
1544        if ids.is_empty() {
1545            return Ok(r#"{"entities":[],"relations":[]}"#.to_string());
1546        }
1547
1548        let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1549
1550        let entities_json: String = {
1551            let sql = format!(
1552                "SELECT COALESCE(json_group_array(json_object(
1553                    'name', e.name,
1554                    'entityType', t.name,
1555                    'observations', COALESCE((
1556                        SELECT json_group_array(o.body ORDER BY o.idx)
1557                        FROM observation o WHERE o.entity_id = e.id
1558                    ), json('[]'))
1559                ) ORDER BY e.id), json('[]'))
1560                FROM entity e
1561                JOIN type_dict t ON t.id = e.type_id
1562                WHERE e.id IN ({placeholders}) AND e.flags = 0"
1563            );
1564            conn.query_row(&sql, rusqlite::params_from_iter(&ids), |row| {
1565                row.get::<_, String>(0)
1566            })
1567            .map_err(sqlite_err)?
1568        };
1569
1570        let relations_json: String = {
1571            let sql = format!(
1572                "SELECT COALESCE(json_group_array(json_object(
1573                    'from', e1.name,
1574                    'to', e2.name,
1575                    'relationType', t.name
1576                )), json('[]'))
1577                FROM relation r
1578                JOIN entity e1 ON e1.id = r.from_id
1579                JOIN entity e2 ON e2.id = r.to_id
1580                JOIN type_dict t ON t.id = r.type_id
1581                WHERE r.from_id IN ({placeholders}) AND r.to_id IN ({placeholders})
1582                  AND e1.flags = 0 AND e2.flags = 0"
1583            );
1584            let all_params: Vec<&dyn ToSql> = ids
1585                .iter()
1586                .map(|id| id as &dyn ToSql)
1587                .chain(ids.iter().map(|id| id as &dyn ToSql))
1588                .collect();
1589            conn.query_row(&sql, all_params.as_slice(), |row| row.get::<_, String>(0))
1590                .map_err(sqlite_err)?
1591        };
1592
1593        let mut out = String::with_capacity(32 + entities_json.len() + relations_json.len());
1594        out.push_str("{\"entities\":");
1595        out.push_str(&entities_json);
1596        out.push_str(",\"relations\":");
1597        out.push_str(&relations_json);
1598        out.push('}');
1599        Ok(out)
1600    }
1601
1602    pub fn open_nodes(&self, names: &[String]) -> String {
1603        let conn = self.readers.get();
1604        let mut entity_ids: Vec<i64> = Vec::new();
1605
1606        for name in names {
1607            let h = name_hash(name);
1608            if let Ok(Some(id)) = conn
1609                .query_row(
1610                    "SELECT id FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
1611                    params![h, name],
1612                    |row| row.get::<_, i64>(0),
1613                )
1614                .map(Some)
1615                .or_else(|e| if is_not_found(&e) { Ok(None) } else { Err(sqlite_err(e)) })
1616            {
1617                entity_ids.push(id);
1618            }
1619        }
1620
1621        if entity_ids.is_empty() {
1622            return r#"{"entities":[],"relations":[]}"#.to_string();
1623        }
1624
1625        let placeholders: Vec<String> = entity_ids.iter().map(|_| "?".to_string()).collect();
1626        let ids_str = placeholders.join(",");
1627
1628        let entities_json: String = {
1629            let sql = format!(
1630                "SELECT COALESCE(json_group_array(json_object(
1631                    'name', e.name,
1632                    'entityType', t.name,
1633                    'observations', COALESCE((
1634                        SELECT json_group_array(o.body ORDER BY o.idx)
1635                        FROM observation o WHERE o.entity_id = e.id
1636                    ), json('[]'))
1637                ) ORDER BY e.id), json('[]'))
1638                FROM entity e
1639                JOIN type_dict t ON t.id = e.type_id
1640                WHERE e.id IN ({ids_str}) AND e.flags = 0"
1641            );
1642            conn.query_row(&sql, rusqlite::params_from_iter(&entity_ids), |row| {
1643                row.get::<_, String>(0)
1644            })
1645            .unwrap_or_else(|_| "[]".to_string())
1646        };
1647
1648        let relations_json: String = {
1649            let sql = format!(
1650                "SELECT COALESCE(json_group_array(json_object(
1651                    'from', e1.name,
1652                    'to', e2.name,
1653                    'relationType', t.name
1654                )), json('[]'))
1655                FROM relation r
1656                JOIN entity e1 ON e1.id = r.from_id
1657                JOIN entity e2 ON e2.id = r.to_id
1658                JOIN type_dict t ON t.id = r.type_id
1659                WHERE (r.from_id IN ({ids_str}) OR r.to_id IN ({ids_str}))
1660                  AND e1.flags = 0 AND e2.flags = 0"
1661            );
1662            let all_params: Vec<&dyn rusqlite::types::ToSql> = entity_ids
1663                .iter()
1664                .map(|id| id as &dyn rusqlite::types::ToSql)
1665                .chain(entity_ids.iter().map(|id| id as &dyn rusqlite::types::ToSql))
1666                .collect();
1667            let mut stmt = conn.prepare(&sql).unwrap();
1668            stmt.query_row(all_params.as_slice(), |row| row.get::<_, String>(0))
1669                .unwrap_or_else(|_| "[]".to_string())
1670        };
1671
1672        let mut out = String::with_capacity(32 + entities_json.len() + relations_json.len());
1673        out.push_str("{\"entities\":");
1674        out.push_str(&entities_json);
1675        out.push_str(",\"relations\":");
1676        out.push_str(&relations_json);
1677        out.push('}');
1678        out
1679    }
1680
1681    pub fn entities_exist(&self, names: &[String]) -> Result<Vec<bool>> {
1682        let conn = self.readers.get();
1683        let mut results = Vec::with_capacity(names.len());
1684        for name in names {
1685            let h = name_hash(name);
1686            let exists: bool = conn
1687                .query_row(
1688                    "SELECT 1 FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
1689                    params![h, name],
1690                    |_| Ok(()),
1691                )
1692                .is_ok();
1693            results.push(exists);
1694        }
1695        Ok(results)
1696    }
1697
1698    pub fn degree(&self, name: &str, direction: Direction) -> Result<usize> {
1699        let conn = self.readers.get();
1700        let (_, _, out_d, in_d) = match self.get_entity_id(&conn, name)? {
1701            Some(v) => v,
1702            None => {
1703                return Err(MCSError::InvalidParams(format!(
1704                    "Entity '{name}' not found"
1705                )))
1706            }
1707        };
1708        Ok(match direction {
1709            Direction::Outgoing => out_d as usize,
1710            Direction::Incoming => in_d as usize,
1711            Direction::Both => (out_d + in_d) as usize,
1712        })
1713    }
1714
1715    pub fn get_entity_count(&self) -> Result<usize> {
1716        let conn = self.readers.get();
1717        read_graph_stat(&conn, "entities")
1718            .map(|v| v as usize)
1719            .map_err(|_| MCSError::MemoryError("Failed to read entity count".into()))
1720    }
1721
1722    pub fn get_relation_count(&self) -> Result<usize> {
1723        let conn = self.readers.get();
1724        read_graph_stat(&conn, "relations")
1725            .map(|v| v as usize)
1726            .map_err(|_| MCSError::MemoryError("Failed to read relation count".into()))
1727    }
1728
1729    pub fn search_relations(
1730        &self,
1731        from: Option<&str>,
1732        to: Option<&str>,
1733        rtype: Option<&str>,
1734    ) -> Vec<Relation> {
1735        let conn = self.readers.get();
1736        let mut results = Vec::new();
1737
1738        // A filter that is supplied but resolves to nothing uses the sentinel
1739        // id -1 (which matches no row), so the query returns empty rather than
1740        // silently dropping the filter and matching every relation. The lookups
1741        // are read-only — `get_type_id` would *insert* a phantom type, which is
1742        // both wrong and impossible on a `query_only` reader connection.
1743        let from_id = from
1744            .filter(|f| !f.is_empty())
1745            .map(|f| entity_name_lookup(&conn, f).ok().flatten().unwrap_or(-1));
1746        let to_id = to
1747            .filter(|t| !t.is_empty())
1748            .map(|t| entity_name_lookup(&conn, t).ok().flatten().unwrap_or(-1));
1749        let type_id = rtype
1750            .filter(|rt| !rt.is_empty())
1751            .map(|rt| lookup_type_id(&conn, rt, 1).unwrap_or(-1));
1752
1753        match (from_id, to_id, type_id) {
1754            (Some(fid), Some(tid), Some(tpid)) => {
1755                if let Ok(mut stmt) = conn.prepare_cached(
1756                    "SELECT e1.name, e2.name, t.name
1757                     FROM relation r
1758                     JOIN entity e1 ON e1.id = r.from_id
1759                     JOIN entity e2 ON e2.id = r.to_id
1760                     JOIN type_dict t ON t.id = r.type_id
1761                     WHERE r.from_id = ?1 AND r.to_id = ?2 AND r.type_id = ?3
1762                       AND e1.flags = 0 AND e2.flags = 0
1763                     ORDER BY r.from_id, r.to_id"
1764                )
1765                    && let Ok(rows) = stmt.query_map(params![fid, tid, tpid], |row| {
1766                        Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1767                    }) {
1768                        for row in rows.flatten() { results.push(row); }
1769                    }
1770            }
1771            (Some(fid), Some(tid), None) => {
1772                if let Ok(mut stmt) = conn.prepare_cached(
1773                    "SELECT e1.name, e2.name, t.name
1774                     FROM relation r
1775                     JOIN entity e1 ON e1.id = r.from_id
1776                     JOIN entity e2 ON e2.id = r.to_id
1777                     JOIN type_dict t ON t.id = r.type_id
1778                     WHERE r.from_id = ?1 AND r.to_id = ?2
1779                       AND e1.flags = 0 AND e2.flags = 0
1780                     ORDER BY r.from_id, r.to_id"
1781                )
1782                    && let Ok(rows) = stmt.query_map(params![fid, tid], |row| {
1783                        Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1784                    }) {
1785                        for row in rows.flatten() { results.push(row); }
1786                    }
1787            }
1788            (Some(fid), None, Some(tpid)) => {
1789                if let Ok(mut stmt) = conn.prepare_cached(
1790                    "SELECT e1.name, e2.name, t.name
1791                     FROM relation r
1792                     JOIN entity e1 ON e1.id = r.from_id
1793                     JOIN entity e2 ON e2.id = r.to_id
1794                     JOIN type_dict t ON t.id = r.type_id
1795                     WHERE r.from_id = ?1 AND r.type_id = ?2
1796                       AND e1.flags = 0 AND e2.flags = 0
1797                     ORDER BY r.from_id, r.to_id"
1798                )
1799                    && let Ok(rows) = stmt.query_map(params![fid, tpid], |row| {
1800                        Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1801                    }) {
1802                        for row in rows.flatten() { results.push(row); }
1803                    }
1804            }
1805            (None, Some(tid), Some(tpid)) => {
1806                if let Ok(mut stmt) = conn.prepare_cached(
1807                    "SELECT e1.name, e2.name, t.name
1808                     FROM relation r
1809                     JOIN entity e1 ON e1.id = r.from_id
1810                     JOIN entity e2 ON e2.id = r.to_id
1811                     JOIN type_dict t ON t.id = r.type_id
1812                     WHERE r.to_id = ?1 AND r.type_id = ?2
1813                       AND e1.flags = 0 AND e2.flags = 0
1814                     ORDER BY r.from_id, r.to_id"
1815                )
1816                    && let Ok(rows) = stmt.query_map(params![tid, tpid], |row| {
1817                        Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1818                    }) {
1819                        for row in rows.flatten() { results.push(row); }
1820                    }
1821            }
1822            (Some(fid), None, None) => {
1823                if let Ok(mut stmt) = conn.prepare_cached(
1824                    "SELECT e1.name, e2.name, t.name
1825                     FROM relation r
1826                     JOIN entity e1 ON e1.id = r.from_id
1827                     JOIN entity e2 ON e2.id = r.to_id
1828                     JOIN type_dict t ON t.id = r.type_id
1829                     WHERE r.from_id = ?1
1830                       AND e1.flags = 0 AND e2.flags = 0
1831                     ORDER BY r.from_id, r.to_id"
1832                )
1833                    && let Ok(rows) = stmt.query_map(params![fid], |row| {
1834                        Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1835                    }) {
1836                        for row in rows.flatten() { results.push(row); }
1837                    }
1838            }
1839            (None, Some(tid), None) => {
1840                if let Ok(mut stmt) = conn.prepare_cached(
1841                    "SELECT e1.name, e2.name, t.name
1842                     FROM relation r
1843                     JOIN entity e1 ON e1.id = r.from_id
1844                     JOIN entity e2 ON e2.id = r.to_id
1845                     JOIN type_dict t ON t.id = r.type_id
1846                     WHERE r.to_id = ?1
1847                       AND e1.flags = 0 AND e2.flags = 0
1848                     ORDER BY r.from_id, r.to_id"
1849                )
1850                    && let Ok(rows) = stmt.query_map(params![tid], |row| {
1851                        Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1852                    }) {
1853                        for row in rows.flatten() { results.push(row); }
1854                    }
1855            }
1856            (None, None, Some(tpid)) => {
1857                if let Ok(mut stmt) = conn.prepare_cached(
1858                    "SELECT e1.name, e2.name, t.name
1859                     FROM relation r
1860                     JOIN entity e1 ON e1.id = r.from_id
1861                     JOIN entity e2 ON e2.id = r.to_id
1862                     JOIN type_dict t ON t.id = r.type_id
1863                     WHERE r.type_id = ?1
1864                       AND e1.flags = 0 AND e2.flags = 0
1865                     ORDER BY r.from_id, r.to_id"
1866                )
1867                    && let Ok(rows) = stmt.query_map(params![tpid], |row| {
1868                        Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1869                    }) {
1870                        for row in rows.flatten() { results.push(row); }
1871                    }
1872            }
1873            (None, None, None) => {
1874                if let Ok(mut stmt) = conn.prepare_cached(
1875                    "SELECT e1.name, e2.name, t.name
1876                     FROM relation r
1877                     JOIN entity e1 ON e1.id = r.from_id
1878                     JOIN entity e2 ON e2.id = r.to_id
1879                     JOIN type_dict t ON t.id = r.type_id
1880                     WHERE e1.flags = 0 AND e2.flags = 0
1881                     ORDER BY r.from_id, r.to_id"
1882                )
1883                    && let Ok(rows) = stmt.query_map([], |row| {
1884                        Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1885                    }) {
1886                        for row in rows.flatten() { results.push(row); }
1887                    }
1888            }
1889        }
1890        results
1891    }
1892
1893    pub fn find_path(&self, from: &str, to: &str) -> Result<Option<Vec<String>>> {
1894        let conn = self.readers.get();
1895        let (from_id, _, _, _) = match self.get_entity_id(&conn, from)? {
1896            Some(v) => v,
1897            None => {
1898                return Err(MCSError::InvalidParams(format!(
1899                    "Source entity '{from}' not found"
1900                )))
1901            }
1902        };
1903        let (to_id, _, _, _) = match self.get_entity_id(&conn, to)? {
1904            Some(v) => v,
1905            None => {
1906                return Err(MCSError::InvalidParams(format!(
1907                    "Target entity '{to}' not found"
1908                )))
1909            }
1910        };
1911
1912        if from_id == to_id {
1913            return Ok(Some(vec![from.to_string()]));
1914        }
1915
1916        // BFS with adjacency from relation table.
1917        let mut visited = HashSet::new();
1918        let mut parent: FxHashMap<i64, i64> = FxHashMap::default();
1919        let mut queue = VecDeque::new();
1920        visited.insert(from_id);
1921        queue.push_back(from_id);
1922
1923        while let Some(cur) = queue.pop_front() {
1924            if cur == to_id {
1925                break;
1926            }
1927            // Fetch out-neighbors.
1928            if let Ok(mut stmt) =
1929                conn.prepare_cached("SELECT to_id FROM relation WHERE from_id = ?1")
1930                && let Ok(rows) = stmt.query_map(params![cur], |row| row.get::<_, i64>(0)) {
1931                    for row in rows.flatten() {
1932                        if visited.insert(row) {
1933                            parent.insert(row, cur);
1934                            queue.push_back(row);
1935                        }
1936                    }
1937                }
1938            // Also check in-neighbors (undirected traversal).
1939            if let Ok(mut stmt) =
1940                conn.prepare_cached("SELECT from_id FROM relation WHERE to_id = ?1")
1941                && let Ok(rows) = stmt.query_map(params![cur], |row| row.get::<_, i64>(0)) {
1942                    for row in rows.flatten() {
1943                        if visited.insert(row) {
1944                            parent.insert(row, cur);
1945                            queue.push_back(row);
1946                        }
1947                    }
1948                }
1949        }
1950
1951        if !parent.contains_key(&to_id) && to_id != from_id {
1952            return Ok(None);
1953        }
1954
1955        let mut path = Vec::new();
1956        let mut cur = to_id;
1957        path.push(cur);
1958        while let Some(&p) = parent.get(&cur) {
1959            path.push(p);
1960            cur = p;
1961            if cur == from_id {
1962                break;
1963            }
1964        }
1965        path.reverse();
1966
1967        let placeholders: Vec<String> = path.iter().map(|_| "?".to_string()).collect();
1968        let sql = format!(
1969            "SELECT id, name FROM entity WHERE id IN ({})",
1970            placeholders.join(",")
1971        );
1972        let name_map: FxHashMap<i64, String> = if let Ok(mut stmt) = conn.prepare(&sql)
1973            && let Ok(rows) = stmt.query_map(
1974                rusqlite::params_from_iter(&path),
1975                |row| Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?)),
1976            ) {
1977            rows.flatten().collect()
1978        } else {
1979            FxHashMap::default()
1980        };
1981
1982        let name_path: Vec<String> = path.iter().filter_map(|id| name_map.get(id).cloned()).collect();
1983
1984        Ok(Some(name_path))
1985    }
1986
1987    pub fn compact(&self) -> Result<()> {
1988        let conn = self.writer.lock();
1989        conn.execute_batch("PRAGMA incremental_vacuum;").map_err(sqlite_err)?;
1990        Ok(())
1991    }
1992
1993    pub fn neighbors(
1994        &self,
1995        name: &str,
1996        direction: Direction,
1997        rtype: Option<&str>,
1998        depth: u32,
1999    ) -> Result<String> {
2000        self._traverse(name, direction, rtype, depth, true)
2001    }
2002
2003    pub fn extract_subgraph(
2004        &self,
2005        names: &[String],
2006        depth: u32,
2007    ) -> Result<String> {
2008        if names.is_empty() {
2009            return Ok(r#"{"entities":[],"relations":[]}"#.to_string());
2010        }
2011
2012        let conn = self.readers.get();
2013        let mut all_entity_ids: HashSet<i64> = HashSet::new();
2014        let mut frontier: HashSet<i64> = HashSet::new();
2015        let mut all_rel_pairs: HashSet<(i64, i64, i64)> = HashSet::new();
2016
2017        // Resolve seed entities.
2018        for name in names {
2019            let h = name_hash(name);
2020            if let Ok(Some(id)) = conn
2021                .query_row(
2022                    "SELECT id FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
2023                    params![h, name],
2024                    |row| row.get::<_, i64>(0),
2025                )
2026                .map(Some)
2027                .or_else(|e| if is_not_found(&e) { Ok(None) } else { Err(sqlite_err(e)) })
2028            {
2029                all_entity_ids.insert(id);
2030                frontier.insert(id);
2031            }
2032        }
2033
2034        let mut current_depth = 0u32;
2035        while current_depth < depth && !frontier.is_empty() {
2036            let mut next_frontier: HashSet<i64> = HashSet::new();
2037
2038            // Collect relations for current frontier — batched IN queries
2039            // instead of one query per frontier entity.
2040            const CHUNK: usize = 500;
2041            let frontier_ids: Vec<i64> = frontier.iter().copied().collect();
2042            for chunk in frontier_ids.chunks(CHUNK) {
2043                let placeholders: Vec<String> = chunk.iter().map(|_| "?".to_string()).collect();
2044                let in_clause = placeholders.join(",");
2045
2046                // Forward: from_id IN chunk.
2047                if let Ok(mut stmt) = conn.prepare(
2048                    &format!(
2049                        "SELECT from_id, to_id, type_id FROM relation WHERE from_id IN ({in_clause})",
2050                    )
2051                )
2052                    && let Ok(rows) = stmt.query_map(
2053                        rusqlite::params_from_iter(chunk),
2054                        |row| Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?, row.get::<_, i64>(2)?)),
2055                    )
2056                {
2057                    for row in rows.flatten() {
2058                        let (from_id, to_id, type_id) = row;
2059                        all_rel_pairs.insert((from_id, to_id, type_id));
2060                        if all_entity_ids.insert(to_id) {
2061                            next_frontier.insert(to_id);
2062                        }
2063                    }
2064                }
2065
2066                // Backward: to_id IN chunk.
2067                if let Ok(mut stmt) = conn.prepare(
2068                    &format!(
2069                        "SELECT from_id, to_id, type_id FROM relation WHERE to_id IN ({in_clause})",
2070                    )
2071                )
2072                    && let Ok(rows) = stmt.query_map(
2073                        rusqlite::params_from_iter(chunk),
2074                        |row| Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?, row.get::<_, i64>(2)?)),
2075                    )
2076                {
2077                    for row in rows.flatten() {
2078                        let (from_id, to_id, type_id) = row;
2079                        all_rel_pairs.insert((from_id, to_id, type_id));
2080                        if all_entity_ids.insert(from_id) {
2081                            next_frontier.insert(from_id);
2082                        }
2083                    }
2084                }
2085            }
2086            frontier = next_frontier;
2087            current_depth += 1;
2088        }
2089
2090        let entities_json: String = {
2091            if all_entity_ids.is_empty() {
2092                "[]".to_string()
2093            } else {
2094                let ids: Vec<i64> = all_entity_ids.iter().copied().collect();
2095                let placeholders: Vec<String> = ids.iter().map(|_| "?".to_string()).collect();
2096                let sql = format!(
2097                    "SELECT COALESCE(json_group_array(json_object(
2098                        'name', e.name,
2099                        'entityType', t.name,
2100                        'observations', COALESCE((
2101                            SELECT json_group_array(o.body ORDER BY o.idx)
2102                            FROM observation o WHERE o.entity_id = e.id
2103                        ), json('[]'))
2104                    ) ORDER BY e.id), json('[]'))
2105                    FROM entity e
2106                    JOIN type_dict t ON t.id = e.type_id
2107                    WHERE e.id IN ({}) AND e.flags = 0",
2108                    placeholders.join(",")
2109                );
2110                conn.query_row(&sql, rusqlite::params_from_iter(&ids), |row| {
2111                    row.get::<_, String>(0)
2112                })
2113                .unwrap_or_else(|_| "[]".to_string())
2114            }
2115        };
2116
2117        let relations_json: String = {
2118            if all_rel_pairs.is_empty() {
2119                "[]".to_string()
2120            } else {
2121                let vals: Vec<String> = all_rel_pairs.iter().map(|_| "(?, ?, ?)".to_string()).collect();
2122                let sql = format!(
2123                    "WITH r(from_id, to_id, type_id) AS (VALUES {})
2124                    SELECT COALESCE(json_group_array(json_object(
2125                        'from', e1.name,
2126                        'to', e2.name,
2127                        'relationType', t.name
2128                    )), json('[]'))
2129                    FROM r
2130                    JOIN entity e1 ON e1.id = r.from_id
2131                    JOIN entity e2 ON e2.id = r.to_id
2132                    JOIN type_dict t ON t.id = r.type_id
2133                    WHERE e1.flags = 0 AND e2.flags = 0",
2134                    vals.join(", ")
2135                );
2136                let params: Vec<&dyn ToSql> = all_rel_pairs.iter()
2137                    .flat_map(|(f, t, tp)| {
2138                        vec![f as &dyn ToSql, t as &dyn ToSql, tp as &dyn ToSql]
2139                    })
2140                    .collect();
2141                let mut stmt = conn.prepare(&sql).map_err(sqlite_err)?;
2142                stmt.query_row(params.as_slice(), |row| row.get::<_, String>(0))
2143                    .unwrap_or_else(|_| "[]".to_string())
2144            }
2145        };
2146
2147        let mut out = String::with_capacity(32 + entities_json.len() + relations_json.len());
2148        out.push_str("{\"entities\":");
2149        out.push_str(&entities_json);
2150        out.push_str(",\"relations\":");
2151        out.push_str(&relations_json);
2152        out.push('}');
2153        Ok(out)
2154    }
2155
2156    pub fn describe_entity(&self, name: &str) -> Result<Entity> {
2157        self.get_entity(name)?
2158            .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))
2159    }
2160
2161    pub fn entity_type_counts(&self) -> Vec<(String, usize)> {
2162        let conn = self.readers.get();
2163        select_all_types(&conn, 0).unwrap_or_default()
2164    }
2165
2166    pub fn relation_type_counts(&self) -> Vec<(String, usize)> {
2167        let conn = self.readers.get();
2168        select_all_types(&conn, 1).unwrap_or_default()
2169    }
2170
2171    pub fn batch_get_entities(&self, names: &[String]) -> Vec<Option<Entity>> {
2172        names
2173            .iter()
2174            .map(|n| self.get_entity(n).unwrap_or(None))
2175            .collect()
2176    }
2177
2178    pub fn find_all_paths(
2179        &self,
2180        from: &str,
2181        to: &str,
2182        max_depth: usize,
2183        max_paths: usize,
2184    ) -> Result<Vec<Vec<String>>> {
2185        let conn = self.readers.get();
2186        let (from_id, _, _, _) = match self.get_entity_id(&conn, from)? {
2187            Some(v) => v,
2188            None => {
2189                return Err(MCSError::InvalidParams(format!(
2190                    "Source entity '{from}' not found"
2191                )))
2192            }
2193        };
2194        let (to_id, _, _, _) = match self.get_entity_id(&conn, to)? {
2195            Some(v) => v,
2196            None => {
2197                return Err(MCSError::InvalidParams(format!(
2198                    "Target entity '{to}' not found"
2199                )))
2200            }
2201        };
2202
2203        if from_id == to_id {
2204            return Ok(vec![vec![from.to_string()]]);
2205        }
2206
2207        // BFS enumerating all paths up to max_depth.
2208        let mut all_paths: Vec<Vec<i64>> = Vec::new();
2209        let mut queue: VecDeque<(i64, Vec<i64>)> = VecDeque::new();
2210        queue.push_back((from_id, vec![from_id]));
2211
2212        const MAX_QUEUE_SIZE: usize = 10_000_000;
2213
2214        while let Some((cur, path)) = queue.pop_front() {
2215            if all_paths.len() >= max_paths {
2216                break;
2217            }
2218            if path.len() > max_depth {
2219                continue;
2220            }
2221
2222            // Out-neighbors.
2223            if let Ok(mut stmt) =
2224                conn.prepare_cached("SELECT to_id FROM relation WHERE from_id = ?1")
2225                && let Ok(rows) = stmt.query_map(params![cur], |row| row.get::<_, i64>(0)) {
2226                    for next_id in rows.flatten() {
2227                        if next_id == to_id {
2228                            let mut full_path = path.clone();
2229                            full_path.push(next_id);
2230                            all_paths.push(full_path);
2231                            if all_paths.len() >= max_paths {
2232                                break;
2233                            }
2234                        } else if !path.contains(&next_id) && path.len() < max_depth {
2235                            if queue.len() >= MAX_QUEUE_SIZE {
2236                                return Err(MCSError::InvalidParams(
2237                                    "Path exploration queue exceeded limit (too many paths on highly connected graph)".to_string()
2238                                ));
2239                            }
2240                            let mut new_path = path.clone();
2241                            new_path.push(next_id);
2242                            queue.push_back((next_id, new_path));
2243                        }
2244                    }
2245                }
2246
2247            // In-neighbors (undirected).
2248            if let Ok(mut stmt) =
2249                conn.prepare_cached("SELECT from_id FROM relation WHERE to_id = ?1")
2250                && let Ok(rows) = stmt.query_map(params![cur], |row| row.get::<_, i64>(0)) {
2251                    for next_id in rows.flatten() {
2252                        if next_id == to_id {
2253                            let mut full_path = path.clone();
2254                            full_path.push(next_id);
2255                            all_paths.push(full_path);
2256                            if all_paths.len() >= max_paths {
2257                                break;
2258                            }
2259                        } else if !path.contains(&next_id) && path.len() < max_depth {
2260                            if queue.len() >= MAX_QUEUE_SIZE {
2261                                return Err(MCSError::InvalidParams(
2262                                    "Path exploration queue exceeded limit (too many paths on highly connected graph)".to_string()
2263                                ));
2264                            }
2265                            let mut new_path = path.clone();
2266                            new_path.push(next_id);
2267                            queue.push_back((next_id, new_path));
2268                        }
2269                    }
2270                }
2271        }
2272
2273        // Convert ids to names — one batch query instead of N lookups per path.
2274        let all_ids: HashSet<i64> = all_paths.iter().flat_map(|p| p.iter()).copied().collect();
2275        let id_list: Vec<i64> = all_ids.into_iter().collect();
2276        let name_map: FxHashMap<i64, String> = if id_list.is_empty() {
2277            FxHashMap::default()
2278        } else {
2279            let placeholders: Vec<String> = id_list.iter().map(|_| "?".to_string()).collect();
2280            let sql = format!(
2281                "SELECT id, name FROM entity WHERE id IN ({})",
2282                placeholders.join(",")
2283            );
2284            if let Ok(mut stmt) = conn.prepare(&sql)
2285                && let Ok(rows) = stmt.query_map(
2286                    rusqlite::params_from_iter(&id_list),
2287                    |row| Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?)),
2288                ) {
2289                rows.flatten().collect()
2290            } else {
2291                FxHashMap::default()
2292            }
2293        };
2294
2295        let mut named_paths: Vec<Vec<String>> = Vec::with_capacity(all_paths.len());
2296        for path_ids in all_paths {
2297            let named: Vec<String> = path_ids.iter().filter_map(|id| name_map.get(id).cloned()).collect();
2298            named_paths.push(named);
2299        }
2300
2301        Ok(named_paths)
2302    }
2303
2304    /// Export the whole graph as a JSON string. `max_rows` caps both the entity
2305    /// and relation arrays so a pathologically large graph cannot be coerced
2306    /// into an unbounded in-memory string (DoS guard); callers pass a generous
2307    /// constant. A negative value means "no limit".
2308    pub fn export(&self, _format: &str, max_rows: i64) -> Result<String> {
2309        let conn = self.readers.get();
2310        // Only JSON is supported; the format argument is accepted for forward
2311        // compatibility.
2312        conn.query_row(
2313            "SELECT json_object(
2314                'entities', COALESCE((
2315                    SELECT json_group_array(json_object(
2316                        'name', e.name,
2317                        'entityType', t.name,
2318                        'observations', COALESCE((
2319                            SELECT json_group_array(o.body ORDER BY o.idx)
2320                            FROM observation o WHERE o.entity_id = e.id
2321                        ), json('[]'))
2322                    ) ORDER BY e.id)
2323                    FROM (
2324                        SELECT id, name, type_id FROM entity
2325                        WHERE flags = 0 ORDER BY id LIMIT ?1
2326                    ) e
2327                    JOIN type_dict t ON t.id = e.type_id
2328                ), json('[]')),
2329                'relations', COALESCE((
2330                    SELECT json_group_array(json_object(
2331                        'from', e1.name,
2332                        'to', e2.name,
2333                        'relationType', t.name
2334                    ))
2335                    FROM (
2336                        SELECT from_id, to_id, type_id FROM relation LIMIT ?1
2337                    ) r
2338                    JOIN entity e1 ON e1.id = r.from_id
2339                    JOIN entity e2 ON e2.id = r.to_id
2340                    JOIN type_dict t ON t.id = r.type_id
2341                    WHERE e1.flags = 0 AND e2.flags = 0
2342                ), json('[]'))
2343            )",
2344            params![max_rows],
2345            |row| row.get::<_, String>(0),
2346        )
2347        .map_err(sqlite_err)
2348    }
2349
2350    pub fn wipe(&self) -> Result<()> {
2351        let conn = self.writer.lock();
2352        // `name_fts`/`obs_fts` are external-content FTS5 tables; the supported
2353        // way to empty them is the special `'delete-all'` command, not a bare
2354        // `DELETE FROM` (which is invalid for external-content tables and can
2355        // leave the index inconsistent). Run it after clearing the content rows.
2356        conn.execute_batch(
2357            "DELETE FROM observation;
2358             DELETE FROM relation;
2359             DELETE FROM entity;
2360             DELETE FROM type_dict;
2361             INSERT INTO name_fts(name_fts) VALUES('delete-all');
2362             INSERT INTO obs_fts(obs_fts) VALUES('delete-all');
2363             UPDATE graph_stat SET value = 0 WHERE key IN ('entities', 'relations', 'observations');
2364             UPDATE graph_stat SET value = 0 WHERE key IN ('entity_seq', 'obs_seq');",
2365        )
2366        .map_err(sqlite_err)?;
2367        self.seq_entity.store(0, Ordering::Relaxed);
2368        self.seq_obs.store(0, Ordering::Relaxed);
2369        self.cache.lock().clear();
2370        Ok(())
2371    }
2372
2373    /// Periodic database maintenance: WAL checkpoint, query planner analysis,
2374    /// and FTS index optimization. Call from a background timer.
2375    pub fn run_maintenance(&self) -> Result<()> {
2376        let conn = self.writer.lock();
2377
2378        conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);")
2379            .map_err(sqlite_err)?;
2380
2381        conn.execute_batch("PRAGMA optimize(0x10000);")
2382            .map_err(sqlite_err)?;
2383
2384        conn.execute_batch(
2385            "INSERT INTO name_fts(name_fts) VALUES('optimize');
2386             INSERT INTO obs_fts(obs_fts) VALUES('optimize');",
2387        )
2388        .map_err(sqlite_err)?;
2389
2390        Ok(())
2391    }
2392
2393    /// Run a non-blocking `wal_checkpoint(PASSIVE)` to fsync committed WAL frames
2394    /// without stalling readers or writers. Call from a short-interval timer to
2395    /// bound the durability window in `async` mode.
2396    pub fn checkpoint_passive(&self) -> Result<()> {
2397        let conn = self.writer.lock();
2398        conn.execute_batch("PRAGMA wal_checkpoint(PASSIVE);")
2399            .map_err(sqlite_err)?;
2400        Ok(())
2401    }
2402
2403    fn _traverse(
2404        &self,
2405        name: &str,
2406        direction: Direction,
2407        rtype: Option<&str>,
2408        depth: u32,
2409        // unused — we always include relations; the caller controls via depth
2410        _include_relations: bool,
2411    ) -> Result<String> {
2412        let conn = self.readers.get();
2413        let (start_id, _, _, _) = match self.get_entity_id(&conn, name)? {
2414            Some(v) => v,
2415            None => {
2416                return Err(MCSError::InvalidParams(format!(
2417                    "Entity '{name}' not found"
2418                )))
2419            }
2420        };
2421
2422        let mut all_ids: HashSet<i64> = HashSet::new();
2423        let mut all_rels: HashSet<(i64, i64, i64)> = HashSet::new();
2424        let mut frontier: HashSet<i64> = HashSet::new();
2425        all_ids.insert(start_id);
2426        frontier.insert(start_id);
2427
2428        // Read-only type resolution. A requested-but-missing type uses the
2429        // sentinel id -1 (matches no edge), so traversal yields just the start
2430        // entity instead of falling back to "no type filter" and walking every
2431        // edge. `get_type_id` is avoided here: it inserts and cannot run on the
2432        // `query_only` reader connection.
2433        let type_filter: Option<i64> = rtype
2434            .filter(|rt| !rt.is_empty())
2435            .map(|rt| lookup_type_id(&conn, rt, 1).unwrap_or(-1));
2436
2437        // Pre-compile all four possible queries outside the loop.
2438        let mut q_out_t = conn.prepare_cached(
2439            "SELECT to_id, type_id FROM relation WHERE from_id = ?1 AND type_id = ?2");
2440        let mut q_out   = conn.prepare_cached(
2441            "SELECT to_id, type_id FROM relation WHERE from_id = ?1");
2442        let mut q_in_t  = conn.prepare_cached(
2443            "SELECT from_id, type_id FROM relation WHERE to_id = ?1 AND type_id = ?2");
2444        let mut q_in    = conn.prepare_cached(
2445            "SELECT from_id, type_id FROM relation WHERE to_id = ?1");
2446
2447        let mut cur_depth = 0u32;
2448        while cur_depth < depth && !frontier.is_empty() {
2449            let mut next_frontier: HashSet<i64> = HashSet::new();
2450
2451            for &fid in &frontier {
2452                if direction == Direction::Outgoing || direction == Direction::Both {
2453                    if let Some(tid) = type_filter {
2454                        if let Ok(ref mut stmt) = q_out_t
2455                            && let Ok(rows) = stmt.query_map(params![fid, tid], |row| {
2456                                Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?))
2457                            }) {
2458                                for row in rows.flatten() {
2459                                    let (to_id, t_id) = row;
2460                                    all_rels.insert((fid, to_id, t_id));
2461                                    if all_ids.insert(to_id) { next_frontier.insert(to_id); }
2462                                }
2463                            }
2464                    } else if let Ok(ref mut stmt) = q_out
2465                        && let Ok(rows) = stmt.query_map(params![fid], |row| {
2466                            Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?))
2467                        }) {
2468                            for row in rows.flatten() {
2469                                let (to_id, t_id) = row;
2470                                all_rels.insert((fid, to_id, t_id));
2471                                if all_ids.insert(to_id) { next_frontier.insert(to_id); }
2472                            }
2473                        }
2474                }
2475
2476                if direction == Direction::Incoming || direction == Direction::Both {
2477                    if let Some(tid) = type_filter {
2478                        if let Ok(ref mut stmt) = q_in_t
2479                            && let Ok(rows) = stmt.query_map(params![fid, tid], |row| {
2480                                Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?))
2481                            }) {
2482                                for row in rows.flatten() {
2483                                    let (from_id, t_id) = row;
2484                                    all_rels.insert((from_id, fid, t_id));
2485                                    if all_ids.insert(from_id) { next_frontier.insert(from_id); }
2486                                }
2487                            }
2488                    } else if let Ok(ref mut stmt) = q_in
2489                        && let Ok(rows) = stmt.query_map(params![fid], |row| {
2490                            Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?))
2491                        }) {
2492                            for row in rows.flatten() {
2493                                let (from_id, t_id) = row;
2494                                all_rels.insert((from_id, fid, t_id));
2495                                if all_ids.insert(from_id) { next_frontier.insert(from_id); }
2496                            }
2497                        }
2498                }
2499            }
2500
2501            frontier = next_frontier;
2502            cur_depth += 1;
2503        }
2504
2505        let entities_json: String = {
2506            if all_ids.is_empty() {
2507                "[]".to_string()
2508            } else {
2509                let ids: Vec<i64> = all_ids.iter().copied().collect();
2510                let placeholders: Vec<String> = ids.iter().map(|_| "?".to_string()).collect();
2511                let sql = format!(
2512                    "SELECT COALESCE(json_group_array(json_object(
2513                        'name', e.name,
2514                        'entityType', t.name,
2515                        'observations', COALESCE((
2516                            SELECT json_group_array(o.body ORDER BY o.idx)
2517                            FROM observation o WHERE o.entity_id = e.id
2518                        ), json('[]'))
2519                    ) ORDER BY e.id), json('[]'))
2520                    FROM entity e
2521                    JOIN type_dict t ON t.id = e.type_id
2522                    WHERE e.id IN ({}) AND e.flags = 0",
2523                    placeholders.join(",")
2524                );
2525                conn.query_row(&sql, rusqlite::params_from_iter(&ids), |row| {
2526                    row.get::<_, String>(0)
2527                })
2528                .unwrap_or_else(|_| "[]".to_string())
2529            }
2530        };
2531
2532        let relations_json: String = {
2533            if all_rels.is_empty() {
2534                "[]".to_string()
2535            } else {
2536                let vals: Vec<String> = all_rels.iter().map(|_| "(?, ?, ?)".to_string()).collect();
2537                let sql = format!(
2538                    "WITH r(from_id, to_id, type_id) AS (VALUES {})
2539                    SELECT COALESCE(json_group_array(json_object(
2540                        'from', e1.name,
2541                        'to', e2.name,
2542                        'relationType', t.name
2543                    )), json('[]'))
2544                    FROM r
2545                    JOIN entity e1 ON e1.id = r.from_id
2546                    JOIN entity e2 ON e2.id = r.to_id
2547                    JOIN type_dict t ON t.id = r.type_id
2548                    WHERE e1.flags = 0 AND e2.flags = 0",
2549                    vals.join(", ")
2550                );
2551                let params: Vec<&dyn ToSql> = all_rels.iter()
2552                    .flat_map(|(f, t, tp)| {
2553                        vec![f as &dyn ToSql, t as &dyn ToSql, tp as &dyn ToSql]
2554                    })
2555                    .collect();
2556                let mut stmt = conn.prepare(&sql).map_err(sqlite_err)?;
2557                stmt.query_row(params.as_slice(), |row| row.get::<_, String>(0))
2558                    .unwrap_or_else(|_| "[]".to_string())
2559            }
2560        };
2561
2562        let mut out = String::with_capacity(32 + entities_json.len() + relations_json.len());
2563        out.push_str("{\"entities\":");
2564        out.push_str(&entities_json);
2565        out.push_str(",\"relations\":");
2566        out.push_str(&relations_json);
2567        out.push('}');
2568        Ok(out)
2569    }
2570}
2571
2572// ── Tests ────────────────────────────────────────────────────────────────
2573
2574#[cfg(test)]
2575mod tests {
2576    use super::*;
2577    use serde_json::Value;
2578    use std::ops::Deref;
2579    use std::path::PathBuf;
2580
2581    struct TestKg(GraphHandle, PathBuf);
2582
2583    impl Deref for TestKg {
2584        type Target = GraphHandle;
2585        fn deref(&self) -> &GraphHandle {
2586            &self.0
2587        }
2588    }
2589
2590    impl Drop for TestKg {
2591        fn drop(&mut self) {
2592            cleanup_db(&self.1);
2593        }
2594    }
2595
2596    fn cleanup_db(path: &std::path::Path) {
2597        let _ = std::fs::remove_file(path);
2598        let _ = std::fs::remove_file(path.with_extension("db-wal"));
2599        let _ = std::fs::remove_file(path.with_extension("db-shm"));
2600    }
2601
2602    fn new_kg() -> TestKg {
2603        use std::sync::atomic::AtomicU64;
2604        use std::sync::atomic::Ordering;
2605        static COUNTER: AtomicU64 = AtomicU64::new(0);
2606        let n = COUNTER.fetch_add(1, Ordering::SeqCst);
2607        let dir = std::env::temp_dir();
2608        let path = dir.join(format!("kg_test_{}_{}.db", std::process::id(), n));
2609        cleanup_db(&path);
2610        let kg = GraphHandle::new(&path, Durability::Async, SqliteTuning::default(), NonZeroUsize::new(10000).unwrap(), 4).expect("create KG");
2611        TestKg(kg, path)
2612    }
2613
2614    #[test]
2615    fn test_create_and_get_entity() {
2616        let kg = new_kg();
2617        let entities = vec![Entity {
2618            name: "test".into(),
2619            entity_type: "person".into(),
2620            observations: vec!["obs1".into(), "obs2".into()],
2621        }];
2622        let created = kg.create_entities(&entities).unwrap();
2623        assert_eq!(created.len(), 1);
2624
2625        let got = kg.get_entity("test").unwrap().unwrap();
2626        assert_eq!(got.name, "test");
2627        assert_eq!(got.entity_type, "person");
2628        assert_eq!(got.observations, vec!["obs1", "obs2"]);
2629    }
2630
2631    #[test]
2632    fn test_get_nonexistent() {
2633        let kg = new_kg();
2634        assert!(kg.get_entity("nonexistent").unwrap().is_none());
2635    }
2636
2637    #[test]
2638    fn test_delete_entity() {
2639        let kg = new_kg();
2640        kg.create_entities(&[Entity {
2641            name: "del".into(),
2642            entity_type: "t".into(),
2643            observations: vec![],
2644        }])
2645        .unwrap();
2646        assert!(kg.get_entity("del").unwrap().is_some());
2647        kg.delete_entities(&["del".to_string()]).unwrap();
2648        assert!(kg.get_entity("del").unwrap().is_none());
2649    }
2650
2651    #[test]
2652    fn test_add_and_delete_observations() {
2653        let kg = new_kg();
2654        kg.create_entities(&[Entity {
2655            name: "obs_test".into(),
2656            entity_type: "t".into(),
2657            observations: vec!["a".into()],
2658        }])
2659        .unwrap();
2660
2661        let added = kg.add_observations("obs_test", &["b".into(), "c".into()]).unwrap();
2662        assert_eq!(added.len(), 2);
2663
2664        let ent = kg.get_entity("obs_test").unwrap().unwrap();
2665        assert!(ent.observations.contains(&"b".into()));
2666        assert!(ent.observations.contains(&"c".into()));
2667
2668        kg.delete_observations("obs_test", &["b".into()]).unwrap();
2669        let ent = kg.get_entity("obs_test").unwrap().unwrap();
2670        assert!(!ent.observations.contains(&"b".into()));
2671        assert!(ent.observations.contains(&"c".into()));
2672        assert!(ent.observations.contains(&"a".into()));
2673    }
2674
2675    #[test]
2676    fn test_create_relations() {
2677        let kg = new_kg();
2678        kg.create_entities(&[
2679            Entity {
2680                name: "A".into(),
2681                entity_type: "node".into(),
2682                observations: vec![],
2683            },
2684            Entity {
2685                name: "B".into(),
2686                entity_type: "node".into(),
2687                observations: vec![],
2688            },
2689        ])
2690        .unwrap();
2691
2692        let rels = kg
2693            .create_relations(&[Relation {
2694                from: "A".into(),
2695                to: "B".into(),
2696                relation_type: "edge".into(),
2697            }])
2698            .unwrap();
2699        assert_eq!(rels.len(), 1);
2700
2701        assert_eq!(kg.get_entity_count().unwrap(), 2);
2702        assert_eq!(kg.get_relation_count().unwrap(), 1);
2703    }
2704
2705    #[test]
2706    fn test_search_nodes() {
2707        let kg = new_kg();
2708        kg.create_entities(&[Entity {
2709            name: "Einstein".into(),
2710            entity_type: "scientist".into(),
2711            observations: vec!["physics".into(), "relativity".into()],
2712        }])
2713        .unwrap();
2714
2715        let results = kg.search_nodes_filtered("physics", None, 0, 10);
2716        assert_eq!(results.len(), 1);
2717        assert_eq!(results[0].name, "Einstein");
2718
2719        let results = kg.search_nodes_filtered("physics", Some("scientist"), 0, 10);
2720        assert_eq!(results.len(), 1);
2721
2722        let results = kg.search_nodes_filtered("physics", Some("nonexistent"), 0, 10);
2723        assert_eq!(results.len(), 0);
2724    }
2725
2726    #[test]
2727    fn test_find_path() {
2728        let kg = new_kg();
2729        kg.create_entities(&[
2730            Entity { name: "A".into(), entity_type: "n".into(), observations: vec![] },
2731            Entity { name: "B".into(), entity_type: "n".into(), observations: vec![] },
2732            Entity { name: "C".into(), entity_type: "n".into(), observations: vec![] },
2733        ]).unwrap();
2734
2735        kg.create_relations(&[
2736            Relation { from: "A".into(), to: "B".into(), relation_type: "e".into() },
2737            Relation { from: "B".into(), to: "C".into(), relation_type: "e".into() },
2738        ]).unwrap();
2739
2740        let path = kg.find_path("A", "C").unwrap().unwrap();
2741        assert_eq!(path, vec!["A", "B", "C"]);
2742    }
2743
2744    #[test]
2745    fn test_degree() {
2746        let kg = new_kg();
2747        kg.create_entities(&[
2748            Entity { name: "A".into(), entity_type: "n".into(), observations: vec![] },
2749            Entity { name: "B".into(), entity_type: "n".into(), observations: vec![] },
2750            Entity { name: "C".into(), entity_type: "n".into(), observations: vec![] },
2751        ]).unwrap();
2752
2753        kg.create_relations(&[
2754            Relation { from: "A".into(), to: "B".into(), relation_type: "e".into() },
2755            Relation { from: "A".into(), to: "C".into(), relation_type: "e".into() },
2756        ]).unwrap();
2757
2758        assert_eq!(kg.degree("A", Direction::Outgoing).unwrap(), 2);
2759        assert_eq!(kg.degree("A", Direction::Incoming).unwrap(), 0);
2760        assert_eq!(kg.degree("B", Direction::Incoming).unwrap(), 1);
2761    }
2762
2763    #[test]
2764    fn test_neighbors() {
2765        let kg = new_kg();
2766        kg.create_entities(&[
2767            Entity { name: "A".into(), entity_type: "n".into(), observations: vec![] },
2768            Entity { name: "B".into(), entity_type: "n".into(), observations: vec![] },
2769        ]).unwrap();
2770
2771        kg.create_relations(&[Relation {
2772            from: "A".into(), to: "B".into(), relation_type: "e".into(),
2773        }]).unwrap();
2774
2775        let result = kg.neighbors("A", Direction::Outgoing, None, 1).unwrap();
2776        let v: Value = serde_json::from_str(&result).unwrap();
2777        assert_eq!(v["entities"].as_array().unwrap().len(), 2);
2778        assert_eq!(v["relations"].as_array().unwrap().len(), 1);
2779    }
2780
2781    #[test]
2782    fn test_open_nodes() {
2783        let kg = new_kg();
2784        kg.create_entities(&[
2785            Entity { name: "X".into(), entity_type: "n".into(), observations: vec!["obs_x".into()] },
2786            Entity { name: "Y".into(), entity_type: "n".into(), observations: vec!["obs_y".into()] },
2787        ]).unwrap();
2788
2789        kg.create_relations(&[Relation {
2790            from: "X".into(), to: "Y".into(), relation_type: "e".into(),
2791        }]).unwrap();
2792
2793        let result = kg.open_nodes(&["X".into()]);
2794        let v: Value = serde_json::from_str(&result).unwrap();
2795        assert_eq!(v["entities"].as_array().unwrap().len(), 1);
2796        assert_eq!(v["relations"].as_array().unwrap().len(), 1);
2797    }
2798
2799    #[test]
2800    fn test_entities_exist() {
2801        let kg = new_kg();
2802        kg.create_entities(&[Entity {
2803            name: "exists".into(), entity_type: "t".into(), observations: vec![],
2804        }]).unwrap();
2805
2806        let res = kg.entities_exist(&["exists".into(), "missing".into()]).unwrap();
2807        assert_eq!(res, vec![true, false]);
2808    }
2809
2810    #[test]
2811    fn test_describe_entity() {
2812        let kg = new_kg();
2813        kg.create_entities(&[Entity {
2814            name: "desc".into(), entity_type: "t".into(), observations: vec!["o".into()],
2815        }]).unwrap();
2816
2817        let entity = kg.describe_entity("desc").unwrap();
2818        assert_eq!(entity.name, "desc");
2819    }
2820
2821    #[test]
2822    fn test_entity_type_counts() {
2823        let kg = new_kg();
2824        kg.create_entities(&[
2825            Entity { name: "a".into(), entity_type: "person".into(), observations: vec![] },
2826            Entity { name: "b".into(), entity_type: "person".into(), observations: vec![] },
2827            Entity { name: "c".into(), entity_type: "place".into(), observations: vec![] },
2828        ]).unwrap();
2829
2830        let counts = kg.entity_type_counts();
2831        let map: FxHashMap<_, _> = counts.into_iter().collect();
2832        assert_eq!(map.get("person"), Some(&2));
2833        assert_eq!(map.get("place"), Some(&1));
2834    }
2835
2836    #[test]
2837    fn test_relation_type_counts() {
2838        let kg = new_kg();
2839        kg.create_entities(&[
2840            Entity { name: "a".into(), entity_type: "n".into(), observations: vec![] },
2841            Entity { name: "b".into(), entity_type: "n".into(), observations: vec![] },
2842            Entity { name: "c".into(), entity_type: "n".into(), observations: vec![] },
2843        ]).unwrap();
2844
2845        kg.create_relations(&[
2846            Relation { from: "a".into(), to: "b".into(), relation_type: "knows".into() },
2847            Relation { from: "a".into(), to: "c".into(), relation_type: "knows".into() },
2848        ]).unwrap();
2849
2850        let counts = kg.relation_type_counts();
2851        let map: FxHashMap<_, _> = counts.into_iter().collect();
2852        assert_eq!(map.get("knows"), Some(&2));
2853    }
2854
2855    #[test]
2856    fn test_upsert_entities() {
2857        let kg = new_kg();
2858        kg.create_entities(&[Entity {
2859            name: "u".into(), entity_type: "old".into(), observations: vec!["existing".into()],
2860        }]).unwrap();
2861
2862        // Upsert with new type and additional observation.
2863        kg.upsert_entities(&[Entity {
2864            name: "u".into(), entity_type: "new".into(), observations: vec!["existing".into(), "added".into()],
2865        }]).unwrap();
2866
2867        let ent = kg.get_entity("u").unwrap().unwrap();
2868        assert_eq!(ent.entity_type, "new");
2869        assert!(ent.observations.contains(&"added".into()));
2870        assert!(ent.observations.contains(&"existing".into()));
2871    }
2872
2873    #[test]
2874    fn test_merge_entities() {
2875        let kg = new_kg();
2876        kg.create_entities(&[
2877            Entity { name: "source".into(), entity_type: "t".into(), observations: vec!["src_obs".into()] },
2878            Entity { name: "target".into(), entity_type: "t".into(), observations: vec!["tgt_obs".into()] },
2879        ]).unwrap();
2880
2881        kg.create_relations(&[Relation {
2882            from: "source".into(), to: "target".into(), relation_type: "e".into(),
2883        }]).unwrap();
2884
2885        let merged = kg.merge_entities("source", "target").unwrap();
2886        assert_eq!(merged.name, "target");
2887        assert_eq!(kg.get_entity("source").unwrap().is_none(), true);
2888    }
2889
2890    #[test]
2891    fn test_find_all_paths() {
2892        let kg = new_kg();
2893        kg.create_entities(&[
2894            Entity { name: "A".into(), entity_type: "n".into(), observations: vec![] },
2895            Entity { name: "B".into(), entity_type: "n".into(), observations: vec![] },
2896            Entity { name: "C".into(), entity_type: "n".into(), observations: vec![] },
2897        ]).unwrap();
2898
2899        kg.create_relations(&[
2900            Relation { from: "A".into(), to: "B".into(), relation_type: "e".into() },
2901            Relation { from: "B".into(), to: "C".into(), relation_type: "e".into() },
2902            Relation { from: "A".into(), to: "C".into(), relation_type: "e".into() },
2903        ]).unwrap();
2904
2905        let paths = kg.find_all_paths("A", "C", 5, 10).unwrap();
2906        assert!(paths.len() >= 2);
2907    }
2908
2909    #[test]
2910    fn test_batch_get_entities() {
2911        let kg = new_kg();
2912        kg.create_entities(&[
2913            Entity { name: "a".into(), entity_type: "t".into(), observations: vec![] },
2914            Entity { name: "b".into(), entity_type: "t".into(), observations: vec![] },
2915        ]).unwrap();
2916
2917        let results = kg.batch_get_entities(&["a".into(), "missing".into(), "b".into()]);
2918        assert_eq!(results.len(), 3);
2919        assert!(results[0].is_some());
2920        assert!(results[1].is_none());
2921        assert!(results[2].is_some());
2922    }
2923
2924    #[test]
2925    fn test_export_graph() {
2926        let kg = new_kg();
2927        kg.create_entities(&[Entity {
2928            name: "exp".into(), entity_type: "t".into(), observations: vec!["o".into()],
2929        }]).unwrap();
2930
2931        let exported = kg.export("json", i64::MAX).unwrap();
2932        assert!(exported.contains("exp"));
2933        assert!(exported.contains("o"));
2934    }
2935
2936    #[test]
2937    fn test_graph_stats() {
2938        let kg = new_kg();
2939        assert_eq!(kg.get_entity_count().unwrap(), 0);
2940        assert_eq!(kg.get_relation_count().unwrap(), 0);
2941
2942        kg.create_entities(&[Entity {
2943            name: "s".into(), entity_type: "t".into(), observations: vec![],
2944        }]).unwrap();
2945
2946        assert_eq!(kg.get_entity_count().unwrap(), 1);
2947    }
2948
2949    #[test]
2950    fn test_read_graph_filtered() {
2951        let kg = new_kg();
2952        kg.create_entities(&[
2953            Entity { name: "p1".into(), entity_type: "person".into(), observations: vec![] },
2954            Entity { name: "p2".into(), entity_type: "place".into(), observations: vec![] },
2955        ]).unwrap();
2956
2957        let out = kg.read_graph_filtered(Some("person"), 0, 10).unwrap();
2958        let v: Value = serde_json::from_str(&out).unwrap();
2959        assert_eq!(v["entities"].as_array().unwrap().len(), 1);
2960        assert_eq!(v["entities"][0]["name"], "p1");
2961    }
2962
2963    #[test]
2964    fn test_wipe() {
2965        let kg = new_kg();
2966        kg.create_entities(&[Entity {
2967            name: "w".into(), entity_type: "t".into(), observations: vec!["o".into()],
2968        }]).unwrap();
2969        assert_eq!(kg.get_entity_count().unwrap(), 1);
2970
2971        kg.wipe().unwrap();
2972        assert_eq!(kg.get_entity_count().unwrap(), 0);
2973    }
2974
2975    #[test]
2976    fn test_push_json_str() {
2977        let mut buf = String::new();
2978        push_json_str(&mut buf, "hello");
2979        assert_eq!(buf, "\"hello\"");
2980        let mut buf = String::new();
2981        push_json_str(&mut buf, "he\"llo");
2982        assert_eq!(buf, "\"he\\\"llo\"");
2983    }
2984
2985    // ── create_entities edge cases ────────────────────────────────────
2986
2987    #[test]
2988    fn test_create_entities_empty_input() {
2989        let kg = new_kg();
2990        let created = kg.create_entities(&[]).unwrap();
2991        assert!(created.is_empty());
2992    }
2993
2994    #[test]
2995    fn test_create_entities_skip_empty_name() {
2996        let kg = new_kg();
2997        let created = kg.create_entities(&[Entity {
2998            name: "".into(),
2999            entity_type: "t".into(),
3000            observations: vec![],
3001        }])
3002        .unwrap();
3003        assert!(created.is_empty());
3004        assert_eq!(kg.get_entity_count().unwrap(), 0);
3005    }
3006
3007    #[test]
3008    fn test_create_entities_duplicate_names() {
3009        let kg = new_kg();
3010        let e = Entity {
3011            name: "dup".into(),
3012            entity_type: "t".into(),
3013            observations: vec!["obs".into()],
3014        };
3015        let first = kg.create_entities(&[e.clone()]).unwrap();
3016        assert_eq!(first.len(), 1);
3017        let second = kg.create_entities(&[e.clone()]).unwrap();
3018        assert!(second.is_empty());
3019        assert_eq!(kg.get_entity_count().unwrap(), 1);
3020    }
3021
3022    #[test]
3023    fn test_create_entities_partial_duplicates() {
3024        let kg = new_kg();
3025        let created = kg.create_entities(&[
3026            Entity { name: "a".into(), entity_type: "t".into(), observations: vec![] },
3027            Entity { name: "b".into(), entity_type: "t".into(), observations: vec![] },
3028        ]).unwrap();
3029        assert_eq!(created.len(), 2);
3030
3031        let second = kg.create_entities(&[
3032            Entity { name: "b".into(), entity_type: "t".into(), observations: vec![] },
3033            Entity { name: "c".into(), entity_type: "t".into(), observations: vec![] },
3034        ]).unwrap();
3035        assert_eq!(second.len(), 1); // only c created
3036        assert_eq!(second[0].name, "c");
3037        assert_eq!(kg.get_entity_count().unwrap(), 3);
3038    }
3039
3040    #[test]
3041    fn test_create_entities_mixed_empty_and_valid() {
3042        let kg = new_kg();
3043        let created = kg.create_entities(&[
3044            Entity { name: "".into(), entity_type: "t".into(), observations: vec![] },
3045            Entity { name: "valid".into(), entity_type: "t".into(), observations: vec![] },
3046            Entity { name: "".into(), entity_type: "t".into(), observations: vec![] },
3047        ]).unwrap();
3048        assert_eq!(created.len(), 1);
3049        assert_eq!(created[0].name, "valid");
3050        assert_eq!(kg.get_entity_count().unwrap(), 1);
3051    }
3052
3053    #[test]
3054    fn test_create_entities_same_name_in_batch() {
3055        let kg = new_kg();
3056        let created = kg.create_entities(&[
3057            Entity { name: "dup_in_batch".into(), entity_type: "t".into(), observations: vec![] },
3058            Entity { name: "dup_in_batch".into(), entity_type: "t".into(), observations: vec![] },
3059        ]).unwrap();
3060        assert_eq!(created.len(), 1);
3061        assert_eq!(kg.get_entity_count().unwrap(), 1);
3062    }
3063
3064    // ── create_relations edge cases ───────────────────────────────────
3065
3066    #[test]
3067    fn test_create_relations_empty_input() {
3068        let kg = new_kg();
3069        let rels = kg.create_relations(&[]).unwrap();
3070        assert!(rels.is_empty());
3071    }
3072
3073    #[test]
3074    fn test_create_relations_nonexistent_from() {
3075        let kg = new_kg();
3076        kg.create_entities(&[Entity {
3077            name: "B".into(), entity_type: "t".into(), observations: vec![],
3078        }]).unwrap();
3079
3080        let rels = kg.create_relations(&[Relation {
3081            from: "A".into(), to: "B".into(), relation_type: "e".into(),
3082        }]).unwrap();
3083        assert!(rels.is_empty());
3084        assert_eq!(kg.get_relation_count().unwrap(), 0);
3085    }
3086
3087    #[test]
3088    fn test_create_relations_nonexistent_to() {
3089        let kg = new_kg();
3090        kg.create_entities(&[Entity {
3091            name: "A".into(), entity_type: "t".into(), observations: vec![],
3092        }]).unwrap();
3093
3094        let rels = kg.create_relations(&[Relation {
3095            from: "A".into(), to: "B".into(), relation_type: "e".into(),
3096        }]).unwrap();
3097        assert!(rels.is_empty());
3098        assert_eq!(kg.get_relation_count().unwrap(), 0);
3099    }
3100
3101    #[test]
3102    fn test_create_relations_both_nonexistent() {
3103        let kg = new_kg();
3104        let rels = kg.create_relations(&[Relation {
3105            from: "A".into(), to: "B".into(), relation_type: "e".into(),
3106        }]).unwrap();
3107        assert!(rels.is_empty());
3108    }
3109
3110    #[test]
3111    fn test_create_relations_self_loop() {
3112        let kg = new_kg();
3113        kg.create_entities(&[Entity {
3114            name: "self".into(), entity_type: "t".into(), observations: vec![],
3115        }]).unwrap();
3116
3117        let rels = kg.create_relations(&[Relation {
3118            from: "self".into(), to: "self".into(), relation_type: "loop".into(),
3119        }]).unwrap();
3120        assert_eq!(rels.len(), 1);
3121        assert_eq!(kg.get_relation_count().unwrap(), 1);
3122        assert_eq!(kg.degree("self", Direction::Outgoing).unwrap(), 1);
3123        assert_eq!(kg.degree("self", Direction::Incoming).unwrap(), 1);
3124    }
3125
3126    #[test]
3127    fn test_create_relations_duplicate() {
3128        let kg = new_kg();
3129        kg.create_entities(&[
3130            Entity { name: "A".into(), entity_type: "t".into(), observations: vec![] },
3131            Entity { name: "B".into(), entity_type: "t".into(), observations: vec![] },
3132        ]).unwrap();
3133
3134        let r = Relation {
3135            from: "A".into(), to: "B".into(), relation_type: "e".into(),
3136        };
3137        let first = kg.create_relations(&[r.clone()]).unwrap();
3138        assert_eq!(first.len(), 1);
3139
3140        let second = kg.create_relations(&[r.clone()]).unwrap();
3141        assert!(second.is_empty());
3142        assert_eq!(kg.get_relation_count().unwrap(), 1);
3143    }
3144
3145    #[test]
3146    fn test_create_relations_new_type_auto_created() {
3147        let kg = new_kg();
3148        kg.create_entities(&[
3149            Entity { name: "A".into(), entity_type: "t".into(), observations: vec![] },
3150            Entity { name: "B".into(), entity_type: "t".into(), observations: vec![] },
3151        ]).unwrap();
3152
3153        let rels = kg.create_relations(&[Relation {
3154            from: "A".into(), to: "B".into(), relation_type: "brand_new_type".into(),
3155        }]).unwrap();
3156        assert_eq!(rels.len(), 1);
3157
3158        let counts = kg.relation_type_counts();
3159        let map: FxHashMap<_, _> = counts.into_iter().collect();
3160        assert_eq!(map.get("brand_new_type"), Some(&1));
3161    }
3162
3163    #[test]
3164    fn test_create_relations_degree_updates() {
3165        let kg = new_kg();
3166        kg.create_entities(&[
3167            Entity { name: "A".into(), entity_type: "t".into(), observations: vec![] },
3168            Entity { name: "B".into(), entity_type: "t".into(), observations: vec![] },
3169            Entity { name: "C".into(), entity_type: "t".into(), observations: vec![] },
3170        ]).unwrap();
3171
3172        kg.create_relations(&[
3173            Relation { from: "A".into(), to: "B".into(), relation_type: "e".into() },
3174            Relation { from: "A".into(), to: "C".into(), relation_type: "e".into() },
3175        ]).unwrap();
3176
3177        assert_eq!(kg.degree("A", Direction::Outgoing).unwrap(), 2);
3178        assert_eq!(kg.degree("A", Direction::Incoming).unwrap(), 0);
3179        assert_eq!(kg.degree("B", Direction::Incoming).unwrap(), 1);
3180        assert_eq!(kg.degree("C", Direction::Incoming).unwrap(), 1);
3181        assert_eq!(kg.degree("A", Direction::Both).unwrap(), 2);
3182    }
3183
3184    #[test]
3185    fn test_create_relations_delete_and_recreate() {
3186        let kg = new_kg();
3187        kg.create_entities(&[
3188            Entity { name: "A".into(), entity_type: "t".into(), observations: vec![] },
3189            Entity { name: "B".into(), entity_type: "t".into(), observations: vec![] },
3190        ]).unwrap();
3191
3192        let r = Relation {
3193            from: "A".into(), to: "B".into(), relation_type: "e".into(),
3194        };
3195        kg.create_relations(&[r.clone()]).unwrap();
3196        assert_eq!(kg.get_relation_count().unwrap(), 1);
3197
3198        kg.delete_relations(&[r.clone()]).unwrap();
3199        assert_eq!(kg.get_relation_count().unwrap(), 0);
3200
3201        // Recreate after delete
3202        let re = kg.create_relations(&[r.clone()]).unwrap();
3203        assert_eq!(re.len(), 1);
3204        assert_eq!(kg.get_relation_count().unwrap(), 1);
3205    }
3206
3207    // ── Integration edge cases ────────────────────────────────────────
3208
3209    #[test]
3210    fn test_create_entities_then_relations_then_delete_entity_with_relations() {
3211        let kg = new_kg();
3212        kg.create_entities(&[
3213            Entity { name: "A".into(), entity_type: "t".into(), observations: vec![] },
3214            Entity { name: "B".into(), entity_type: "t".into(), observations: vec![] },
3215        ]).unwrap();
3216        kg.create_relations(&[
3217            Relation { from: "A".into(), to: "B".into(), relation_type: "e".into() },
3218        ]).unwrap();
3219
3220        assert_eq!(kg.get_relation_count().unwrap(), 1);
3221
3222        // Deleting entity A should also delete the relation
3223        kg.delete_entities(&["A".into()]).unwrap();
3224        assert_eq!(kg.get_entity("A").unwrap().is_none(), true);
3225        assert_eq!(kg.get_relation_count().unwrap(), 0);
3226    }
3227
3228    #[test]
3229    fn test_graph_stats_after_entity_with_observations() {
3230        let kg = new_kg();
3231        kg.create_entities(&[Entity {
3232            name: "stat".into(), entity_type: "t".into(),
3233            observations: vec!["o1".into(), "o2".into(), "o3".into()],
3234        }]).unwrap();
3235
3236        let ecount = kg.get_entity_count().unwrap();
3237        // graph_stat for observations is tracked but there's no public getter for it
3238        assert_eq!(ecount, 1);
3239
3240        // delete reverts stats
3241        kg.delete_entities(&["stat".into()]).unwrap();
3242        assert_eq!(kg.get_entity_count().unwrap(), 0);
3243    }
3244
3245    // ── Helpers for the fix-specific suites ────────────────────────────────
3246
3247    fn new_kg_with_pool(read_pool_size: usize) -> TestKg {
3248        use std::sync::atomic::AtomicU64;
3249        static COUNTER: AtomicU64 = AtomicU64::new(1_000_000);
3250        let n = COUNTER.fetch_add(1, Ordering::SeqCst);
3251        let path = std::env::temp_dir().join(format!("kg_pool_{}_{}.db", std::process::id(), n));
3252        cleanup_db(&path);
3253        let kg = GraphHandle::new(
3254            &path,
3255            Durability::Async,
3256            SqliteTuning::default(),
3257            NonZeroUsize::new(10_000).unwrap(),
3258            read_pool_size,
3259        )
3260        .expect("create KG");
3261        TestKg(kg, path)
3262    }
3263
3264    fn seed_line(kg: &GraphHandle, n: usize) {
3265        let entities: Vec<Entity> = (0..n)
3266            .map(|i| Entity {
3267                name: format!("n{i}"),
3268                entity_type: "node".into(),
3269                observations: vec![format!("obs of n{i}")],
3270            })
3271            .collect();
3272        kg.create_entities(&entities).unwrap();
3273        let rels: Vec<Relation> = (0..n.saturating_sub(1))
3274            .map(|i| Relation {
3275                from: format!("n{i}"),
3276                to: format!("n{}", i + 1),
3277                relation_type: "edge".into(),
3278            })
3279            .collect();
3280        if !rels.is_empty() {
3281            kg.create_relations(&rels).unwrap();
3282        }
3283    }
3284
3285    fn count_relations(graph_json: &str) -> usize {
3286        let v: Value = serde_json::from_str(graph_json).unwrap();
3287        v["relations"].as_array().unwrap().len()
3288    }
3289
3290    fn count_entities(graph_json: &str) -> usize {
3291        let v: Value = serde_json::from_str(graph_json).unwrap();
3292        v["entities"].as_array().unwrap().len()
3293    }
3294
3295    // ── Fix #1: reader pool / concurrency ──────────────────────────────────
3296
3297    #[test]
3298    fn test_pool_size_one_still_works() {
3299        let kg = new_kg_with_pool(1);
3300        seed_line(&kg, 5);
3301        assert_eq!(kg.get_entity_count().unwrap(), 5);
3302        assert!(kg.get_entity("n2").unwrap().is_some());
3303        let g = kg.read_graph_filtered(None, 0, usize::MAX).unwrap();
3304        assert_eq!(count_entities(&g), 5);
3305    }
3306
3307    #[test]
3308    fn test_reads_see_committed_writes() {
3309        // A read on a pool connection must observe a just-committed write made on
3310        // the writer connection (WAL visibility).
3311        let kg = new_kg_with_pool(4);
3312        kg.create_entities(&[Entity {
3313            name: "fresh".into(),
3314            entity_type: "t".into(),
3315            observations: vec!["v".into()],
3316        }])
3317        .unwrap();
3318        // get_entity goes through the reader pool.
3319        let got = kg.get_entity("fresh").unwrap().unwrap();
3320        assert_eq!(got.observations, vec!["v"]);
3321    }
3322
3323    #[test]
3324    fn test_concurrent_readers_consistent() {
3325        // Many readers hammering the pool while the writer mutates must never
3326        // panic, deadlock, or observe a torn graph. The final counts must match.
3327        let kg = new_kg_with_pool(4);
3328        seed_line(&kg, 50);
3329
3330        std::thread::scope(|s| {
3331            // 8 reader threads.
3332            for _ in 0..8 {
3333                s.spawn(|| {
3334                    for _ in 0..200 {
3335                        let _ = kg.get_entity("n10");
3336                        let _ = kg.search_nodes_filtered("obs", None, 0, 10);
3337                        let _ = kg.read_graph_filtered(None, 0, 100);
3338                        let _ = kg.get_entity_count();
3339                        let _ = kg.neighbors("n10", Direction::Both, None, 2);
3340                    }
3341                });
3342            }
3343            // 1 writer thread adding more entities concurrently.
3344            s.spawn(|| {
3345                for i in 100..160 {
3346                    kg.create_entities(&[Entity {
3347                        name: format!("w{i}"),
3348                        entity_type: "node".into(),
3349                        observations: vec![format!("w obs {i}")],
3350                    }])
3351                    .unwrap();
3352                }
3353            });
3354        });
3355
3356        // 50 seeded + 60 written.
3357        assert_eq!(kg.get_entity_count().unwrap(), 110);
3358        assert!(kg.get_entity("w159").unwrap().is_some());
3359    }
3360
3361    #[test]
3362    fn test_reader_pool_rejects_writes_internally() {
3363        // Sanity: query_only readers cannot mutate. We can't call a write through
3364        // the pool directly, but we can confirm a read method that *would* have
3365        // inserted (search_relations resolving a missing type) does not create a
3366        // phantom type — see the dedicated test below — and that reads under a
3367        // size-1 pool serialize correctly without deadlock.
3368        let kg = new_kg_with_pool(1);
3369        seed_line(&kg, 3);
3370        std::thread::scope(|s| {
3371            for _ in 0..4 {
3372                s.spawn(|| {
3373                    for _ in 0..100 {
3374                        let _ = kg.read_graph_filtered(None, 0, 10);
3375                    }
3376                });
3377            }
3378        });
3379        assert_eq!(kg.get_entity_count().unwrap(), 3);
3380    }
3381
3382    // ── Fix #6: read_graph relation scoping + export bound ─────────────────
3383
3384    #[test]
3385    fn test_read_graph_relations_scoped_to_page() {
3386        let kg = new_kg_with_pool(2);
3387        // n0 -> n1 -> n2 -> n3 (4 entities, 3 edges).
3388        seed_line(&kg, 4);
3389
3390        // Full page: all 3 edges present.
3391        let full = kg.read_graph_filtered(None, 0, usize::MAX).unwrap();
3392        assert_eq!(count_entities(&full), 4);
3393        assert_eq!(count_relations(&full), 3);
3394
3395        // Page of only the first entity (n0): its only edge n0->n1 has an
3396        // endpoint (n1) outside the page, so no relations are returned.
3397        let page1 = kg.read_graph_filtered(None, 0, 1).unwrap();
3398        assert_eq!(count_entities(&page1), 1);
3399        assert_eq!(count_relations(&page1), 0);
3400
3401        // Page of first two entities (n0, n1): edge n0->n1 fully inside, n1->n2
3402        // straddles the boundary and is excluded.
3403        let page2 = kg.read_graph_filtered(None, 0, 2).unwrap();
3404        assert_eq!(count_entities(&page2), 2);
3405        assert_eq!(count_relations(&page2), 1);
3406    }
3407
3408    #[test]
3409    fn test_read_graph_pagination_offset() {
3410        let kg = new_kg_with_pool(2);
3411        seed_line(&kg, 5);
3412        let g = kg.read_graph_filtered(None, 2, 2).unwrap();
3413        assert_eq!(count_entities(&g), 2);
3414        // Entities are ordered by id; offset 2 skips n0, n1.
3415        assert!(!g.contains("\"n0\""));
3416        assert!(!g.contains("\"n1\""));
3417        assert!(g.contains("\"n2\""));
3418        assert!(g.contains("\"n3\""));
3419    }
3420
3421    #[test]
3422    fn test_read_graph_empty() {
3423        let kg = new_kg_with_pool(2);
3424        let g = kg.read_graph_filtered(None, 0, usize::MAX).unwrap();
3425        assert_eq!(g, r#"{"entities":[],"relations":[]}"#);
3426    }
3427
3428    #[test]
3429    fn test_read_graph_filtered_by_type() {
3430        let kg = new_kg_with_pool(2);
3431        kg.create_entities(&[
3432            Entity { name: "p1".into(), entity_type: "person".into(), observations: vec![] },
3433            Entity { name: "q1".into(), entity_type: "place".into(), observations: vec![] },
3434            Entity { name: "p2".into(), entity_type: "person".into(), observations: vec![] },
3435        ])
3436        .unwrap();
3437        let g = kg.read_graph_filtered(Some("person"), 0, usize::MAX).unwrap();
3438        assert_eq!(count_entities(&g), 2);
3439        assert!(g.contains("\"p1\""));
3440        assert!(g.contains("\"p2\""));
3441        assert!(!g.contains("\"q1\""));
3442    }
3443
3444    #[test]
3445    fn test_export_respects_max_rows() {
3446        let kg = new_kg_with_pool(2);
3447        seed_line(&kg, 5);
3448
3449        // Unbounded export returns everything.
3450        let full = kg.export("json", i64::MAX).unwrap();
3451        assert_eq!(count_entities(&full), 5);
3452        assert_eq!(count_relations(&full), 4);
3453
3454        // Capped export truncates both arrays to the cap.
3455        let capped = kg.export("json", 2).unwrap();
3456        assert_eq!(count_entities(&capped), 2);
3457        assert_eq!(count_relations(&capped), 2);
3458    }
3459
3460    #[test]
3461    fn test_export_negative_max_rows_is_unbounded() {
3462        let kg = new_kg_with_pool(2);
3463        seed_line(&kg, 3);
3464        // SQLite treats a negative LIMIT as "no limit".
3465        let out = kg.export("json", -1).unwrap();
3466        assert_eq!(count_entities(&out), 3);
3467    }
3468
3469    // ── Fix #8: writes remain correct without the per-write PRAGMA optimize ─
3470
3471    #[test]
3472    fn test_many_small_write_batches_stay_consistent() {
3473        let kg = new_kg_with_pool(2);
3474        for i in 0..100 {
3475            kg.create_entities(&[Entity {
3476                name: format!("e{i}"),
3477                entity_type: "t".into(),
3478                observations: vec![format!("o{i}")],
3479            }])
3480            .unwrap();
3481        }
3482        assert_eq!(kg.get_entity_count().unwrap(), 100);
3483        // Search must still find a needle inserted across many tiny batches,
3484        // proving FTS stayed consistent without per-write optimization.
3485        let hits = kg.search_nodes_filtered("e57", None, 0, 10);
3486        assert!(hits.iter().any(|e| e.name == "e57"));
3487    }
3488
3489    // ── Fix #9: wipe fully resets the FTS indexes ──────────────────────────
3490
3491    #[test]
3492    fn test_wipe_clears_name_and_obs_fts() {
3493        let kg = new_kg_with_pool(2);
3494        kg.create_entities(&[Entity {
3495            name: "Einstein".into(),
3496            entity_type: "scientist".into(),
3497            observations: vec!["physics".into()],
3498        }])
3499        .unwrap();
3500
3501        // Both FTS indexes resolve before the wipe.
3502        assert_eq!(kg.search_nodes_filtered("Einstein", None, 0, 10).len(), 1);
3503        assert_eq!(kg.search_nodes_filtered("physics", None, 0, 10).len(), 1);
3504
3505        kg.wipe().unwrap();
3506
3507        // After wipe both indexes must be empty (a bare DELETE on an
3508        // external-content FTS5 table would have left stale rowids behind).
3509        assert_eq!(kg.get_entity_count().unwrap(), 0);
3510        assert!(kg.search_nodes_filtered("Einstein", None, 0, 10).is_empty());
3511        assert!(kg.search_nodes_filtered("physics", None, 0, 10).is_empty());
3512    }
3513
3514    #[test]
3515    fn test_wipe_then_recreate_search_works() {
3516        // Recreating the same names after a wipe must produce a clean, searchable
3517        // index — not a corrupted one or duplicate FTS rows.
3518        let kg = new_kg_with_pool(2);
3519        kg.create_entities(&[Entity {
3520            name: "Einstein".into(),
3521            entity_type: "scientist".into(),
3522            observations: vec!["physics".into()],
3523        }])
3524        .unwrap();
3525        kg.wipe().unwrap();
3526
3527        kg.create_entities(&[Entity {
3528            name: "Einstein".into(),
3529            entity_type: "scientist".into(),
3530            observations: vec!["physics".into(), "relativity".into()],
3531        }])
3532        .unwrap();
3533
3534        let by_name = kg.search_nodes_filtered("Einstein", None, 0, 10);
3535        assert_eq!(by_name.len(), 1, "exactly one Einstein after recreate");
3536        let by_obs = kg.search_nodes_filtered("relativity", None, 0, 10);
3537        assert_eq!(by_obs.len(), 1);
3538        assert_eq!(kg.get_entity_count().unwrap(), 1);
3539    }
3540
3541    // ── Read-only type/entity resolution (introduced by the reader pool) ───
3542
3543    #[test]
3544    fn test_search_relations_missing_type_returns_empty() {
3545        let kg = new_kg_with_pool(2);
3546        seed_line(&kg, 3); // edges of type "edge"
3547        // A filter for a relation type that does not exist must return nothing,
3548        // not every relation — and must not create a phantom type row.
3549        let r = kg.search_relations(None, None, Some("does_not_exist"));
3550        assert!(r.is_empty());
3551        // The phantom type must not have been inserted by the read.
3552        let types = kg.relation_type_counts();
3553        assert!(types.iter().all(|(t, _)| t != "does_not_exist"));
3554    }
3555
3556    #[test]
3557    fn test_search_relations_missing_from_returns_empty() {
3558        let kg = new_kg_with_pool(2);
3559        seed_line(&kg, 3);
3560        let r = kg.search_relations(Some("ghost"), None, None);
3561        assert!(r.is_empty(), "missing 'from' must not match every relation");
3562    }
3563
3564    #[test]
3565    fn test_search_relations_existing_filters_still_work() {
3566        let kg = new_kg_with_pool(2);
3567        seed_line(&kg, 3);
3568        let r = kg.search_relations(Some("n0"), None, Some("edge"));
3569        assert_eq!(r.len(), 1);
3570        assert_eq!(r[0].from, "n0");
3571        assert_eq!(r[0].to, "n1");
3572    }
3573
3574    #[test]
3575    fn test_neighbors_missing_type_returns_only_start() {
3576        let kg = new_kg_with_pool(2);
3577        seed_line(&kg, 3);
3578        let json = kg
3579            .neighbors("n0", Direction::Both, Some("nonexistent"), 2)
3580            .unwrap();
3581        // No edge matches the bogus type, so only the start node comes back.
3582        assert_eq!(count_entities(&json), 1);
3583        assert_eq!(count_relations(&json), 0);
3584    }
3585
3586    #[test]
3587    fn test_neighbors_existing_type_filters() {
3588        let kg = new_kg_with_pool(2);
3589        kg.create_entities(&[
3590            Entity { name: "a".into(), entity_type: "n".into(), observations: vec![] },
3591            Entity { name: "b".into(), entity_type: "n".into(), observations: vec![] },
3592            Entity { name: "c".into(), entity_type: "n".into(), observations: vec![] },
3593        ])
3594        .unwrap();
3595        kg.create_relations(&[
3596            Relation { from: "a".into(), to: "b".into(), relation_type: "knows".into() },
3597            Relation { from: "a".into(), to: "c".into(), relation_type: "likes".into() },
3598        ])
3599        .unwrap();
3600        let json = kg.neighbors("a", Direction::Outgoing, Some("knows"), 1).unwrap();
3601        assert!(json.contains("\"b\""));
3602        assert!(!json.contains("\"c\""));
3603        assert_eq!(count_relations(&json), 1);
3604    }
3605
3606    #[test]
3607    fn test_sqlite_tuning_applied_to_fresh_db() {
3608        use std::sync::atomic::AtomicU64;
3609        static COUNTER: AtomicU64 = AtomicU64::new(2_000_000);
3610        let n = COUNTER.fetch_add(1, Ordering::SeqCst);
3611        let path = std::env::temp_dir().join(format!("kg_tuning_{}_{}.db", std::process::id(), n));
3612        cleanup_db(&path);
3613
3614        let tuning = SqliteTuning {
3615            page_size: 8192,
3616            ..SqliteTuning::default()
3617        };
3618        let kg = TestKg(
3619            GraphHandle::new(&path, Durability::Async, tuning, NonZeroUsize::new(64).unwrap(), 2)
3620                .expect("create KG"),
3621            path.clone(),
3622        );
3623        kg.create_entities(&[Entity {
3624            name: "a".into(),
3625            entity_type: "n".into(),
3626            observations: vec!["o".into()],
3627        }])
3628        .unwrap();
3629
3630        // page_size (fresh-DB only) and auto_vacuum=INCREMENTAL must have taken
3631        // effect, and journal_mode must be WAL.
3632        let probe = Connection::open(&path).unwrap();
3633        let page_size: i64 = probe.query_row("PRAGMA page_size", [], |r| r.get(0)).unwrap();
3634        assert_eq!(page_size, 8192);
3635        let auto_vacuum: i64 = probe.query_row("PRAGMA auto_vacuum", [], |r| r.get(0)).unwrap();
3636        assert_eq!(auto_vacuum, 2, "expected INCREMENTAL auto_vacuum");
3637        let journal: String = probe.query_row("PRAGMA journal_mode", [], |r| r.get(0)).unwrap();
3638        assert_eq!(journal.to_lowercase(), "wal");
3639    }
3640
3641    #[test]
3642    fn test_checkpoint_passive_is_noop_safe() {
3643        let kg = new_kg();
3644        // On an empty / freshly-written DB a passive checkpoint must succeed.
3645        kg.checkpoint_passive().unwrap();
3646        kg.create_entities(&[Entity {
3647            name: "a".into(),
3648            entity_type: "n".into(),
3649            observations: vec!["o".into()],
3650        }])
3651        .unwrap();
3652        // And after a write, repeatedly, without error or deadlock.
3653        kg.checkpoint_passive().unwrap();
3654        kg.checkpoint_passive().unwrap();
3655        // Data is still readable afterwards.
3656        assert!(kg.get_entity("a").unwrap().is_some());
3657    }
3658}