Skip to main content

zeph_memory/graph/store/
mod.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use std::collections::HashMap;
5#[allow(unused_imports)]
6use zeph_db::sql;
7
8use futures::Stream;
9use zeph_db::fts::sanitize_fts_query;
10use zeph_db::{ActiveDialect, DbPool, numbered_placeholder, placeholder_list};
11
12use crate::error::MemoryError;
13use crate::types::MessageId;
14
15use super::types::{Community, Edge, EdgeType, Entity, EntityAlias, EntityType};
16
17pub struct GraphStore {
18    pool: DbPool,
19}
20
21impl GraphStore {
22    #[must_use]
23    pub fn new(pool: DbPool) -> Self {
24        Self { pool }
25    }
26
27    #[must_use]
28    pub fn pool(&self) -> &DbPool {
29        &self.pool
30    }
31
32    // ── Entities ─────────────────────────────────────────────────────────────
33
34    /// Insert or update an entity by `(canonical_name, entity_type)`.
35    ///
36    /// - `surface_name`: the original display form (e.g. `"Rust"`) — stored in the `name` column
37    ///   so user-facing output preserves casing. Updated on every upsert to the latest seen form.
38    /// - `canonical_name`: the stable normalized key (e.g. `"rust"`) — used for deduplication.
39    /// - `summary`: pass `None` to preserve the existing summary; pass `Some("")` to blank it.
40    ///
41    /// # Errors
42    ///
43    /// Returns an error if the database query fails.
44    pub async fn upsert_entity(
45        &self,
46        surface_name: &str,
47        canonical_name: &str,
48        entity_type: EntityType,
49        summary: Option<&str>,
50    ) -> Result<i64, MemoryError> {
51        let type_str = entity_type.as_str();
52        let id: i64 = zeph_db::query_scalar(sql!(
53            "INSERT INTO graph_entities (name, canonical_name, entity_type, summary)
54             VALUES (?, ?, ?, ?)
55             ON CONFLICT(canonical_name, entity_type) DO UPDATE SET
56               name = excluded.name,
57               summary = COALESCE(excluded.summary, summary),
58               last_seen_at = CURRENT_TIMESTAMP
59             RETURNING id"
60        ))
61        .bind(surface_name)
62        .bind(canonical_name)
63        .bind(type_str)
64        .bind(summary)
65        .fetch_one(&self.pool)
66        .await?;
67        Ok(id)
68    }
69
70    /// Find an entity by exact canonical name and type.
71    ///
72    /// # Errors
73    ///
74    /// Returns an error if the database query fails.
75    pub async fn find_entity(
76        &self,
77        canonical_name: &str,
78        entity_type: EntityType,
79    ) -> Result<Option<Entity>, MemoryError> {
80        let type_str = entity_type.as_str();
81        let row: Option<EntityRow> = zeph_db::query_as(
82            sql!("SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
83             FROM graph_entities
84             WHERE canonical_name = ? AND entity_type = ?"),
85        )
86        .bind(canonical_name)
87        .bind(type_str)
88        .fetch_optional(&self.pool)
89        .await?;
90        row.map(entity_from_row).transpose()
91    }
92
93    /// Find an entity by its numeric ID.
94    ///
95    /// # Errors
96    ///
97    /// Returns an error if the database query fails.
98    pub async fn find_entity_by_id(&self, entity_id: i64) -> Result<Option<Entity>, MemoryError> {
99        let row: Option<EntityRow> = zeph_db::query_as(
100            sql!("SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
101             FROM graph_entities
102             WHERE id = ?"),
103        )
104        .bind(entity_id)
105        .fetch_optional(&self.pool)
106        .await?;
107        row.map(entity_from_row).transpose()
108    }
109
110    /// Update the `qdrant_point_id` for an entity.
111    ///
112    /// # Errors
113    ///
114    /// Returns an error if the database query fails.
115    pub async fn set_entity_qdrant_point_id(
116        &self,
117        entity_id: i64,
118        point_id: &str,
119    ) -> Result<(), MemoryError> {
120        zeph_db::query(sql!(
121            "UPDATE graph_entities SET qdrant_point_id = ? WHERE id = ?"
122        ))
123        .bind(point_id)
124        .bind(entity_id)
125        .execute(&self.pool)
126        .await?;
127        Ok(())
128    }
129
130    /// Find entities matching `query` in name, summary, or aliases, up to `limit` results, ranked by relevance.
131    ///
132    /// Uses FTS5 MATCH with prefix wildcards (`token*`) and bm25 ranking. Name matches are
133    /// weighted 10x higher than summary matches. Also searches `graph_entity_aliases` for
134    /// alias matches via a UNION query.
135    ///
136    /// # Behavioral note
137    ///
138    /// This replaces the previous `LIKE '%query%'` implementation. FTS5 prefix matching differs
139    /// from substring matching: searching "SQL" will match "`SQLite`" (prefix) but NOT "`GraphQL`"
140    /// (substring). Entity names are indexed as single tokens by the unicode61 tokenizer, so
141    /// mid-word substrings are not matched. This is a known trade-off for index performance.
142    ///
143    /// Single-character queries (e.g., "a") are allowed and produce a broad prefix match ("a*").
144    /// The `limit` parameter caps the result set. No minimum query length is enforced; if this
145    /// causes noise in practice, add a minimum length guard at the call site.
146    ///
147    /// # Errors
148    ///
149    /// Returns an error if the database query fails.
150    pub async fn find_entities_fuzzy(
151        &self,
152        query: &str,
153        limit: usize,
154    ) -> Result<Vec<Entity>, MemoryError> {
155        // FTS5 boolean operator keywords (case-sensitive uppercase). Filtering these
156        // prevents syntax errors when user input contains them as literal search terms
157        // (e.g., "graph OR unrelated" must not produce "graph* OR* unrelated*").
158        const FTS5_OPERATORS: &[&str] = &["AND", "OR", "NOT", "NEAR"];
159        let query = &query[..query.floor_char_boundary(512)];
160        // Sanitize input: split on non-alphanumeric characters, filter empty tokens,
161        // append '*' to each token for FTS5 prefix matching ("graph" -> "graph*").
162        let sanitized = sanitize_fts_query(query);
163        if sanitized.is_empty() {
164            return Ok(vec![]);
165        }
166        let fts_query: String = sanitized
167            .split_whitespace()
168            .filter(|t| !FTS5_OPERATORS.contains(t))
169            .map(|t| format!("{t}*"))
170            .collect::<Vec<_>>()
171            .join(" ");
172        if fts_query.is_empty() {
173            return Ok(vec![]);
174        }
175
176        let limit = i64::try_from(limit)?;
177        // bm25(graph_entities_fts, 10.0, 1.0): name column weighted 10x over summary.
178        // bm25() returns negative values; ORDER BY ASC puts best matches first.
179        let search_sql = format!(
180            "SELECT DISTINCT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
181                    e.first_seen_at, e.last_seen_at, e.qdrant_point_id \
182             FROM graph_entities_fts fts \
183             JOIN graph_entities e ON e.id = fts.rowid \
184             WHERE graph_entities_fts MATCH ? \
185             UNION \
186             SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
187                    e.first_seen_at, e.last_seen_at, e.qdrant_point_id \
188             FROM graph_entity_aliases a \
189             JOIN graph_entities e ON e.id = a.entity_id \
190             WHERE a.alias_name LIKE ? ESCAPE '\\' {} \
191             LIMIT ?",
192            <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
193        );
194        let rows: Vec<EntityRow> = zeph_db::query_as(&search_sql)
195            .bind(&fts_query)
196            .bind(format!(
197                "%{}%",
198                query
199                    .trim()
200                    .replace('\\', "\\\\")
201                    .replace('%', "\\%")
202                    .replace('_', "\\_")
203            ))
204            .bind(limit)
205            .fetch_all(&self.pool)
206            .await?;
207        rows.into_iter()
208            .map(entity_from_row)
209            .collect::<Result<Vec<_>, _>>()
210    }
211
212    /// Flush the `SQLite` WAL to the main database file.
213    ///
214    /// Runs `PRAGMA wal_checkpoint(PASSIVE)`. Safe to call at any time; does not block active
215    /// readers or writers. Call after bulk entity inserts to ensure FTS5 shadow table writes are
216    /// visible to connections opened in future sessions.
217    ///
218    /// # Errors
219    ///
220    /// Returns an error if the PRAGMA execution fails.
221    #[cfg(feature = "sqlite")]
222    pub async fn checkpoint_wal(&self) -> Result<(), MemoryError> {
223        zeph_db::query("PRAGMA wal_checkpoint(PASSIVE)")
224            .execute(&self.pool)
225            .await?;
226        Ok(())
227    }
228
229    /// No-op on `PostgreSQL` (WAL management is handled by the server).
230    ///
231    /// # Errors
232    ///
233    /// Never returns an error.
234    #[cfg(feature = "postgres")]
235    pub async fn checkpoint_wal(&self) -> Result<(), MemoryError> {
236        Ok(())
237    }
238
239    /// Stream all entities from the database incrementally (true cursor, no full-table load).
240    pub fn all_entities_stream(&self) -> impl Stream<Item = Result<Entity, MemoryError>> + '_ {
241        use futures::StreamExt as _;
242        zeph_db::query_as::<_, EntityRow>(
243            sql!("SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
244             FROM graph_entities ORDER BY id ASC"),
245        )
246        .fetch(&self.pool)
247        .map(|r: Result<EntityRow, zeph_db::SqlxError>| {
248            r.map_err(MemoryError::from).and_then(entity_from_row)
249        })
250    }
251
252    // ── Alias methods ─────────────────────────────────────────────────────────
253
254    /// Insert an alias for an entity (idempotent: duplicate alias is silently ignored via UNIQUE constraint).
255    ///
256    /// # Errors
257    ///
258    /// Returns an error if the database query fails.
259    pub async fn add_alias(&self, entity_id: i64, alias_name: &str) -> Result<(), MemoryError> {
260        let insert_alias_sql = format!(
261            "{} INTO graph_entity_aliases (entity_id, alias_name) VALUES (?, ?){}",
262            <ActiveDialect as zeph_db::dialect::Dialect>::INSERT_IGNORE,
263            <ActiveDialect as zeph_db::dialect::Dialect>::CONFLICT_NOTHING,
264        );
265        zeph_db::query(&insert_alias_sql)
266            .bind(entity_id)
267            .bind(alias_name)
268            .execute(&self.pool)
269            .await?;
270        Ok(())
271    }
272
273    /// Find an entity by alias name and entity type (case-insensitive).
274    ///
275    /// Filters by `entity_type` to avoid cross-type alias collisions (S2 fix).
276    ///
277    /// # Errors
278    ///
279    /// Returns an error if the database query fails.
280    pub async fn find_entity_by_alias(
281        &self,
282        alias_name: &str,
283        entity_type: EntityType,
284    ) -> Result<Option<Entity>, MemoryError> {
285        let type_str = entity_type.as_str();
286        let alias_typed_sql = format!(
287            "SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
288                    e.first_seen_at, e.last_seen_at, e.qdrant_point_id \
289             FROM graph_entity_aliases a \
290             JOIN graph_entities e ON e.id = a.entity_id \
291             WHERE a.alias_name = ? {} \
292               AND e.entity_type = ? \
293             ORDER BY e.id ASC \
294             LIMIT 1",
295            <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
296        );
297        let row: Option<EntityRow> = zeph_db::query_as(&alias_typed_sql)
298            .bind(alias_name)
299            .bind(type_str)
300            .fetch_optional(&self.pool)
301            .await?;
302        row.map(entity_from_row).transpose()
303    }
304
305    /// Get all aliases for an entity.
306    ///
307    /// # Errors
308    ///
309    /// Returns an error if the database query fails.
310    pub async fn aliases_for_entity(
311        &self,
312        entity_id: i64,
313    ) -> Result<Vec<EntityAlias>, MemoryError> {
314        let rows: Vec<AliasRow> = zeph_db::query_as(sql!(
315            "SELECT id, entity_id, alias_name, created_at
316             FROM graph_entity_aliases
317             WHERE entity_id = ?
318             ORDER BY id ASC"
319        ))
320        .bind(entity_id)
321        .fetch_all(&self.pool)
322        .await?;
323        Ok(rows.into_iter().map(alias_from_row).collect())
324    }
325
326    /// Collect all entities into a Vec.
327    ///
328    /// # Errors
329    ///
330    /// Returns an error if the database query fails or `entity_type` parsing fails.
331    pub async fn all_entities(&self) -> Result<Vec<Entity>, MemoryError> {
332        use futures::TryStreamExt as _;
333        self.all_entities_stream().try_collect().await
334    }
335
336    /// Count the total number of entities.
337    ///
338    /// # Errors
339    ///
340    /// Returns an error if the database query fails.
341    pub async fn entity_count(&self) -> Result<i64, MemoryError> {
342        let count: i64 = zeph_db::query_scalar(sql!("SELECT COUNT(*) FROM graph_entities"))
343            .fetch_one(&self.pool)
344            .await?;
345        Ok(count)
346    }
347
348    // ── Edges ─────────────────────────────────────────────────────────────────
349
350    /// Insert a new edge between two entities, or update the existing active edge.
351    ///
352    /// An active edge is identified by `(source_entity_id, target_entity_id, relation, edge_type)`
353    /// with `valid_to IS NULL`. If such an edge already exists, its `confidence` is updated to the
354    /// maximum of the stored and incoming values, and the existing id is returned. This prevents
355    /// duplicate edges from repeated extraction of the same context messages.
356    ///
357    /// The dedup key includes `edge_type` (critic mitigation): the same `(source, target, relation)`
358    /// triple can legitimately exist with different edge types (e.g., `depends_on` can be both
359    /// Semantic and Causal). Without `edge_type` in the key, the second insertion would silently
360    /// update the first and lose the type classification.
361    ///
362    /// # Errors
363    ///
364    /// Returns an error if the database query fails.
365    pub async fn insert_edge(
366        &self,
367        source_entity_id: i64,
368        target_entity_id: i64,
369        relation: &str,
370        fact: &str,
371        confidence: f32,
372        episode_id: Option<MessageId>,
373    ) -> Result<i64, MemoryError> {
374        self.insert_edge_typed(
375            source_entity_id,
376            target_entity_id,
377            relation,
378            fact,
379            confidence,
380            episode_id,
381            EdgeType::Semantic,
382        )
383        .await
384    }
385
386    /// Insert a typed edge between two entities, or update the existing active edge of the same type.
387    ///
388    /// Identical semantics to [`insert_edge`] but with an explicit `edge_type` parameter.
389    /// The dedup key is `(source_entity_id, target_entity_id, relation, edge_type, valid_to IS NULL)`.
390    ///
391    /// # Errors
392    ///
393    /// Returns an error if the database query fails.
394    #[allow(clippy::too_many_arguments)]
395    pub async fn insert_edge_typed(
396        &self,
397        source_entity_id: i64,
398        target_entity_id: i64,
399        relation: &str,
400        fact: &str,
401        confidence: f32,
402        episode_id: Option<MessageId>,
403        edge_type: EdgeType,
404    ) -> Result<i64, MemoryError> {
405        if source_entity_id == target_entity_id {
406            return Err(MemoryError::InvalidInput(format!(
407                "self-loop edge rejected: source and target are the same entity (id={source_entity_id})"
408            )));
409        }
410        let confidence = confidence.clamp(0.0, 1.0);
411        let edge_type_str = edge_type.as_str();
412
413        // Wrap SELECT + INSERT/UPDATE in a single transaction to eliminate the race window
414        // between existence check and write. The unique partial index uq_graph_edges_active
415        // covers (source, target, relation, edge_type) WHERE valid_to IS NULL; SQLite does not
416        // support ON CONFLICT DO UPDATE against partial indexes, so we keep two statements.
417        let mut tx = zeph_db::begin(&self.pool).await?;
418
419        let existing: Option<(i64, f64)> = zeph_db::query_as(sql!(
420            "SELECT id, confidence FROM graph_edges
421             WHERE source_entity_id = ?
422               AND target_entity_id = ?
423               AND relation = ?
424               AND edge_type = ?
425               AND valid_to IS NULL
426             LIMIT 1"
427        ))
428        .bind(source_entity_id)
429        .bind(target_entity_id)
430        .bind(relation)
431        .bind(edge_type_str)
432        .fetch_optional(&mut *tx)
433        .await?;
434
435        if let Some((existing_id, stored_conf)) = existing {
436            let updated_conf = f64::from(confidence).max(stored_conf);
437            zeph_db::query(sql!("UPDATE graph_edges SET confidence = ? WHERE id = ?"))
438                .bind(updated_conf)
439                .bind(existing_id)
440                .execute(&mut *tx)
441                .await?;
442            tx.commit().await?;
443            return Ok(existing_id);
444        }
445
446        let episode_raw: Option<i64> = episode_id.map(|m| m.0);
447        let id: i64 = zeph_db::query_scalar(sql!(
448            "INSERT INTO graph_edges
449             (source_entity_id, target_entity_id, relation, fact, confidence, episode_id, edge_type)
450             VALUES (?, ?, ?, ?, ?, ?, ?)
451             RETURNING id"
452        ))
453        .bind(source_entity_id)
454        .bind(target_entity_id)
455        .bind(relation)
456        .bind(fact)
457        .bind(f64::from(confidence))
458        .bind(episode_raw)
459        .bind(edge_type_str)
460        .fetch_one(&mut *tx)
461        .await?;
462        tx.commit().await?;
463        Ok(id)
464    }
465
466    /// Mark an edge as invalid (set `valid_to` and `expired_at` to now).
467    ///
468    /// # Errors
469    ///
470    /// Returns an error if the database update fails.
471    pub async fn invalidate_edge(&self, edge_id: i64) -> Result<(), MemoryError> {
472        zeph_db::query(sql!(
473            "UPDATE graph_edges SET valid_to = CURRENT_TIMESTAMP, expired_at = CURRENT_TIMESTAMP
474             WHERE id = ?"
475        ))
476        .bind(edge_id)
477        .execute(&self.pool)
478        .await?;
479        Ok(())
480    }
481
482    /// Invalidate an edge and record the supersession pointer for Kumiho belief revision audit trail.
483    ///
484    /// Sets `valid_to`, `expired_at`, and `superseded_by` on the old edge to link it to its replacement.
485    ///
486    /// # Errors
487    ///
488    /// Returns an error if the database update fails.
489    pub async fn invalidate_edge_with_supersession(
490        &self,
491        old_edge_id: i64,
492        new_edge_id: i64,
493    ) -> Result<(), MemoryError> {
494        zeph_db::query(sql!(
495            "UPDATE graph_edges
496             SET valid_to = CURRENT_TIMESTAMP,
497                 expired_at = CURRENT_TIMESTAMP,
498                 superseded_by = ?
499             WHERE id = ?"
500        ))
501        .bind(new_edge_id)
502        .bind(old_edge_id)
503        .execute(&self.pool)
504        .await?;
505        Ok(())
506    }
507
508    /// Get all active edges for a batch of entity IDs, with optional MAGMA edge type filtering.
509    ///
510    /// Fetches all currently-active edges (`valid_to IS NULL`) where either endpoint
511    /// is in `entity_ids`. Traversal is always current-time only (no `at_timestamp` support
512    /// in v1 — see `bfs_at_timestamp` for historical traversal).
513    ///
514    /// # `SQLite` bind limit safety
515    ///
516    /// `SQLite` limits the number of bind parameters to `SQLITE_MAX_VARIABLE_NUMBER` (999 by
517    /// default). Each entity ID requires two bind slots (source OR target), so batches are
518    /// chunked at `MAX_BATCH_ENTITIES = 490` to stay safely under the limit regardless of
519    /// compile-time `SQLite` configuration.
520    ///
521    /// # Errors
522    ///
523    /// Returns an error if the database query fails.
524    pub async fn edges_for_entities(
525        &self,
526        entity_ids: &[i64],
527        edge_types: &[super::types::EdgeType],
528    ) -> Result<Vec<Edge>, MemoryError> {
529        // Safe margin under SQLite SQLITE_MAX_VARIABLE_NUMBER (999):
530        // each entity ID uses 2 bind slots (source_entity_id OR target_entity_id).
531        // 490 * 2 = 980, leaving headroom for future query additions.
532        const MAX_BATCH_ENTITIES: usize = 490;
533
534        let mut all_edges: Vec<Edge> = Vec::new();
535
536        for chunk in entity_ids.chunks(MAX_BATCH_ENTITIES) {
537            let edges = self.query_batch_edges(chunk, edge_types).await?;
538            all_edges.extend(edges);
539        }
540
541        Ok(all_edges)
542    }
543
544    /// Query active edges for a single chunk of entity IDs (internal helper).
545    ///
546    /// Caller is responsible for ensuring `entity_ids.len() <= MAX_BATCH_ENTITIES`.
547    async fn query_batch_edges(
548        &self,
549        entity_ids: &[i64],
550        edge_types: &[super::types::EdgeType],
551    ) -> Result<Vec<Edge>, MemoryError> {
552        if entity_ids.is_empty() {
553            return Ok(Vec::new());
554        }
555
556        // Build a parameterized IN clause with backend-appropriate placeholders.
557        // We cannot use the sql! macro here because the placeholder count is dynamic.
558        let n_ids = entity_ids.len();
559        let n_types = edge_types.len();
560
561        let sql = if n_types == 0 {
562            // placeholders used twice (source IN and target IN)
563            let placeholders = placeholder_list(1, n_ids);
564            let placeholders2 = placeholder_list(n_ids + 1, n_ids);
565            format!(
566                "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
567                        valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
568                        edge_type, retrieval_count, last_retrieved_at, superseded_by, superseded_by
569                 FROM graph_edges
570                 WHERE valid_to IS NULL
571                   AND (source_entity_id IN ({placeholders}) OR target_entity_id IN ({placeholders2}))"
572            )
573        } else {
574            let placeholders = placeholder_list(1, n_ids);
575            let placeholders2 = placeholder_list(n_ids + 1, n_ids);
576            let type_placeholders = placeholder_list(n_ids * 2 + 1, n_types);
577            format!(
578                "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
579                        valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
580                        edge_type, retrieval_count, last_retrieved_at, superseded_by, superseded_by
581                 FROM graph_edges
582                 WHERE valid_to IS NULL
583                   AND (source_entity_id IN ({placeholders}) OR target_entity_id IN ({placeholders2}))
584                   AND edge_type IN ({type_placeholders})"
585            )
586        };
587
588        // Bind entity IDs twice (source IN and target IN clauses) then edge types.
589        let mut query = zeph_db::query_as::<_, EdgeRow>(&sql);
590        for id in entity_ids {
591            query = query.bind(*id);
592        }
593        for id in entity_ids {
594            query = query.bind(*id);
595        }
596        for et in edge_types {
597            query = query.bind(et.as_str());
598        }
599
600        let rows: Vec<EdgeRow> = query.fetch_all(&self.pool).await?;
601        Ok(rows.into_iter().map(edge_from_row).collect())
602    }
603
604    /// Get all active edges where entity is source or target.
605    ///
606    /// # Errors
607    ///
608    /// Returns an error if the database query fails.
609    pub async fn edges_for_entity(&self, entity_id: i64) -> Result<Vec<Edge>, MemoryError> {
610        let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
611            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
612                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
613                    edge_type, retrieval_count, last_retrieved_at, superseded_by
614             FROM graph_edges
615             WHERE valid_to IS NULL
616               AND (source_entity_id = ? OR target_entity_id = ?)"
617        ))
618        .bind(entity_id)
619        .bind(entity_id)
620        .fetch_all(&self.pool)
621        .await?;
622        Ok(rows.into_iter().map(edge_from_row).collect())
623    }
624
625    /// Get all edges (active and expired) where entity is source or target, ordered by
626    /// `valid_from DESC`. Used by the `/graph history <name>` slash command.
627    ///
628    /// # Errors
629    ///
630    /// Returns an error if the database query fails or if `limit` overflows `i64`.
631    pub async fn edge_history_for_entity(
632        &self,
633        entity_id: i64,
634        limit: usize,
635    ) -> Result<Vec<Edge>, MemoryError> {
636        let limit = i64::try_from(limit)?;
637        let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
638            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
639                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
640                    edge_type, retrieval_count, last_retrieved_at, superseded_by
641             FROM graph_edges
642             WHERE source_entity_id = ? OR target_entity_id = ?
643             ORDER BY valid_from DESC
644             LIMIT ?"
645        ))
646        .bind(entity_id)
647        .bind(entity_id)
648        .bind(limit)
649        .fetch_all(&self.pool)
650        .await?;
651        Ok(rows.into_iter().map(edge_from_row).collect())
652    }
653
654    /// Get all active edges between two entities (both directions).
655    ///
656    /// # Errors
657    ///
658    /// Returns an error if the database query fails.
659    pub async fn edges_between(
660        &self,
661        entity_a: i64,
662        entity_b: i64,
663    ) -> Result<Vec<Edge>, MemoryError> {
664        let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
665            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
666                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
667                    edge_type, retrieval_count, last_retrieved_at, superseded_by
668             FROM graph_edges
669             WHERE valid_to IS NULL
670               AND ((source_entity_id = ? AND target_entity_id = ?)
671                 OR (source_entity_id = ? AND target_entity_id = ?))"
672        ))
673        .bind(entity_a)
674        .bind(entity_b)
675        .bind(entity_b)
676        .bind(entity_a)
677        .fetch_all(&self.pool)
678        .await?;
679        Ok(rows.into_iter().map(edge_from_row).collect())
680    }
681
682    /// Get active edges from `source` to `target` in the exact direction (no reverse).
683    ///
684    /// # Errors
685    ///
686    /// Returns an error if the database query fails.
687    pub async fn edges_exact(
688        &self,
689        source_entity_id: i64,
690        target_entity_id: i64,
691    ) -> Result<Vec<Edge>, MemoryError> {
692        let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
693            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
694                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
695                    edge_type, retrieval_count, last_retrieved_at, superseded_by
696             FROM graph_edges
697             WHERE valid_to IS NULL
698               AND source_entity_id = ?
699               AND target_entity_id = ?"
700        ))
701        .bind(source_entity_id)
702        .bind(target_entity_id)
703        .fetch_all(&self.pool)
704        .await?;
705        Ok(rows.into_iter().map(edge_from_row).collect())
706    }
707
708    /// Count active (non-invalidated) edges.
709    ///
710    /// # Errors
711    ///
712    /// Returns an error if the database query fails.
713    pub async fn active_edge_count(&self) -> Result<i64, MemoryError> {
714        let count: i64 = zeph_db::query_scalar(sql!(
715            "SELECT COUNT(*) FROM graph_edges WHERE valid_to IS NULL"
716        ))
717        .fetch_one(&self.pool)
718        .await?;
719        Ok(count)
720    }
721
722    /// Return per-type active edge counts as `(edge_type, count)` pairs.
723    ///
724    /// # Errors
725    ///
726    /// Returns an error if the database query fails.
727    pub async fn edge_type_distribution(&self) -> Result<Vec<(String, i64)>, MemoryError> {
728        let rows: Vec<(String, i64)> = zeph_db::query_as(
729            sql!("SELECT edge_type, COUNT(*) FROM graph_edges WHERE valid_to IS NULL GROUP BY edge_type ORDER BY edge_type"),
730        )
731        .fetch_all(&self.pool)
732        .await?;
733        Ok(rows)
734    }
735
736    // ── Communities ───────────────────────────────────────────────────────────
737
738    /// Insert or update a community by name.
739    ///
740    /// `fingerprint` is a BLAKE3 hex string computed from sorted entity IDs and
741    /// intra-community edge IDs. Pass `None` to leave the fingerprint unchanged (e.g. when
742    /// `assign_to_community` adds an entity without a full re-detection pass).
743    ///
744    /// # Errors
745    ///
746    /// Returns an error if the database query fails or JSON serialization fails.
747    pub async fn upsert_community(
748        &self,
749        name: &str,
750        summary: &str,
751        entity_ids: &[i64],
752        fingerprint: Option<&str>,
753    ) -> Result<i64, MemoryError> {
754        let entity_ids_json = serde_json::to_string(entity_ids)?;
755        let id: i64 = zeph_db::query_scalar(sql!(
756            "INSERT INTO graph_communities (name, summary, entity_ids, fingerprint)
757             VALUES (?, ?, ?, ?)
758             ON CONFLICT(name) DO UPDATE SET
759               summary = excluded.summary,
760               entity_ids = excluded.entity_ids,
761               fingerprint = COALESCE(excluded.fingerprint, fingerprint),
762               updated_at = CURRENT_TIMESTAMP
763             RETURNING id"
764        ))
765        .bind(name)
766        .bind(summary)
767        .bind(entity_ids_json)
768        .bind(fingerprint)
769        .fetch_one(&self.pool)
770        .await?;
771        Ok(id)
772    }
773
774    /// Return a map of `fingerprint -> community_id` for all communities with a non-NULL
775    /// fingerprint. Used by `detect_communities` to skip unchanged partitions.
776    ///
777    /// # Errors
778    ///
779    /// Returns an error if the database query fails.
780    pub async fn community_fingerprints(&self) -> Result<HashMap<String, i64>, MemoryError> {
781        let rows: Vec<(String, i64)> = zeph_db::query_as(sql!(
782            "SELECT fingerprint, id FROM graph_communities WHERE fingerprint IS NOT NULL"
783        ))
784        .fetch_all(&self.pool)
785        .await?;
786        Ok(rows.into_iter().collect())
787    }
788
789    /// Delete a single community by its primary key.
790    ///
791    /// # Errors
792    ///
793    /// Returns an error if the database query fails.
794    pub async fn delete_community_by_id(&self, id: i64) -> Result<(), MemoryError> {
795        zeph_db::query(sql!("DELETE FROM graph_communities WHERE id = ?"))
796            .bind(id)
797            .execute(&self.pool)
798            .await?;
799        Ok(())
800    }
801
802    /// Set the fingerprint of a community to `NULL`, invalidating the incremental cache.
803    ///
804    /// Used by `assign_to_community` when an entity is added without a full re-detection pass,
805    /// ensuring the next `detect_communities` run re-summarizes the affected community.
806    ///
807    /// # Errors
808    ///
809    /// Returns an error if the database query fails.
810    pub async fn clear_community_fingerprint(&self, id: i64) -> Result<(), MemoryError> {
811        zeph_db::query(sql!(
812            "UPDATE graph_communities SET fingerprint = NULL WHERE id = ?"
813        ))
814        .bind(id)
815        .execute(&self.pool)
816        .await?;
817        Ok(())
818    }
819
820    /// Find the first community that contains the given `entity_id`.
821    ///
822    /// Uses `json_each()` to push the membership search into `SQLite`, avoiding a full
823    /// table scan with per-row JSON parsing.
824    ///
825    /// # Errors
826    ///
827    /// Returns an error if the database query fails or JSON parsing fails.
828    pub async fn community_for_entity(
829        &self,
830        entity_id: i64,
831    ) -> Result<Option<Community>, MemoryError> {
832        let row: Option<CommunityRow> = zeph_db::query_as(
833            sql!("SELECT c.id, c.name, c.summary, c.entity_ids, c.fingerprint, c.created_at, c.updated_at
834             FROM graph_communities c, json_each(c.entity_ids) j
835             WHERE CAST(j.value AS INTEGER) = ?
836             LIMIT 1"),
837        )
838        .bind(entity_id)
839        .fetch_optional(&self.pool)
840        .await?;
841        match row {
842            Some(row) => {
843                let entity_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
844                Ok(Some(Community {
845                    id: row.id,
846                    name: row.name,
847                    summary: row.summary,
848                    entity_ids,
849                    fingerprint: row.fingerprint,
850                    created_at: row.created_at,
851                    updated_at: row.updated_at,
852                }))
853            }
854            None => Ok(None),
855        }
856    }
857
858    /// Get all communities.
859    ///
860    /// # Errors
861    ///
862    /// Returns an error if the database query fails or JSON parsing fails.
863    pub async fn all_communities(&self) -> Result<Vec<Community>, MemoryError> {
864        let rows: Vec<CommunityRow> = zeph_db::query_as(sql!(
865            "SELECT id, name, summary, entity_ids, fingerprint, created_at, updated_at
866             FROM graph_communities
867             ORDER BY id ASC"
868        ))
869        .fetch_all(&self.pool)
870        .await?;
871
872        rows.into_iter()
873            .map(|row| {
874                let entity_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
875                Ok(Community {
876                    id: row.id,
877                    name: row.name,
878                    summary: row.summary,
879                    entity_ids,
880                    fingerprint: row.fingerprint,
881                    created_at: row.created_at,
882                    updated_at: row.updated_at,
883                })
884            })
885            .collect()
886    }
887
888    /// Count the total number of communities.
889    ///
890    /// # Errors
891    ///
892    /// Returns an error if the database query fails.
893    pub async fn community_count(&self) -> Result<i64, MemoryError> {
894        let count: i64 = zeph_db::query_scalar(sql!("SELECT COUNT(*) FROM graph_communities"))
895            .fetch_one(&self.pool)
896            .await?;
897        Ok(count)
898    }
899
900    // ── Metadata ──────────────────────────────────────────────────────────────
901
902    /// Get a metadata value by key.
903    ///
904    /// # Errors
905    ///
906    /// Returns an error if the database query fails.
907    pub async fn get_metadata(&self, key: &str) -> Result<Option<String>, MemoryError> {
908        let val: Option<String> =
909            zeph_db::query_scalar(sql!("SELECT value FROM graph_metadata WHERE key = ?"))
910                .bind(key)
911                .fetch_optional(&self.pool)
912                .await?;
913        Ok(val)
914    }
915
916    /// Set a metadata value by key (upsert).
917    ///
918    /// # Errors
919    ///
920    /// Returns an error if the database query fails.
921    pub async fn set_metadata(&self, key: &str, value: &str) -> Result<(), MemoryError> {
922        zeph_db::query(sql!(
923            "INSERT INTO graph_metadata (key, value) VALUES (?, ?)
924             ON CONFLICT(key) DO UPDATE SET value = excluded.value"
925        ))
926        .bind(key)
927        .bind(value)
928        .execute(&self.pool)
929        .await?;
930        Ok(())
931    }
932
933    /// Get the current extraction count from metadata.
934    ///
935    /// Returns 0 if the counter has not been initialized.
936    ///
937    /// # Errors
938    ///
939    /// Returns an error if the database query fails.
940    pub async fn extraction_count(&self) -> Result<i64, MemoryError> {
941        let val = self.get_metadata("extraction_count").await?;
942        Ok(val.and_then(|v| v.parse::<i64>().ok()).unwrap_or(0))
943    }
944
945    /// Stream all active (non-invalidated) edges.
946    pub fn all_active_edges_stream(&self) -> impl Stream<Item = Result<Edge, MemoryError>> + '_ {
947        use futures::StreamExt as _;
948        zeph_db::query_as::<_, EdgeRow>(sql!(
949            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
950                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
951                    edge_type, retrieval_count, last_retrieved_at, superseded_by
952             FROM graph_edges
953             WHERE valid_to IS NULL
954             ORDER BY id ASC"
955        ))
956        .fetch(&self.pool)
957        .map(|r| r.map_err(MemoryError::from).map(edge_from_row))
958    }
959
960    /// Fetch a chunk of active edges using keyset pagination.
961    ///
962    /// Returns edges with `id > after_id` in ascending order, up to `limit` rows.
963    /// Starting with `after_id = 0` returns the first chunk. Pass the last `id` from
964    /// the returned chunk as `after_id` for the next page. An empty result means all
965    /// edges have been consumed.
966    ///
967    /// Keyset pagination is O(1) per page (index seek on `id`) vs OFFSET which is O(N).
968    /// It is also stable under concurrent inserts: new edges get monotonically higher IDs
969    /// and will appear in subsequent chunks or after the last chunk, never causing
970    /// duplicates. Concurrent invalidations (setting `valid_to`) may cause a single edge
971    /// to be skipped, which is acceptable — LPA operates on an eventual-consistency snapshot.
972    ///
973    /// # Errors
974    ///
975    /// Returns an error if the database query fails.
976    pub async fn edges_after_id(
977        &self,
978        after_id: i64,
979        limit: i64,
980    ) -> Result<Vec<Edge>, MemoryError> {
981        let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
982            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
983                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
984                    edge_type, retrieval_count, last_retrieved_at, superseded_by
985             FROM graph_edges
986             WHERE valid_to IS NULL AND id > ?
987             ORDER BY id ASC
988             LIMIT ?"
989        ))
990        .bind(after_id)
991        .bind(limit)
992        .fetch_all(&self.pool)
993        .await?;
994        Ok(rows.into_iter().map(edge_from_row).collect())
995    }
996
997    /// Find a community by its primary key.
998    ///
999    /// # Errors
1000    ///
1001    /// Returns an error if the database query fails or JSON parsing fails.
1002    pub async fn find_community_by_id(&self, id: i64) -> Result<Option<Community>, MemoryError> {
1003        let row: Option<CommunityRow> = zeph_db::query_as(sql!(
1004            "SELECT id, name, summary, entity_ids, fingerprint, created_at, updated_at
1005             FROM graph_communities
1006             WHERE id = ?"
1007        ))
1008        .bind(id)
1009        .fetch_optional(&self.pool)
1010        .await?;
1011        match row {
1012            Some(row) => {
1013                let entity_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
1014                Ok(Some(Community {
1015                    id: row.id,
1016                    name: row.name,
1017                    summary: row.summary,
1018                    entity_ids,
1019                    fingerprint: row.fingerprint,
1020                    created_at: row.created_at,
1021                    updated_at: row.updated_at,
1022                }))
1023            }
1024            None => Ok(None),
1025        }
1026    }
1027
1028    /// Delete all communities (full rebuild before upsert).
1029    ///
1030    /// # Errors
1031    ///
1032    /// Returns an error if the database query fails.
1033    pub async fn delete_all_communities(&self) -> Result<(), MemoryError> {
1034        zeph_db::query(sql!("DELETE FROM graph_communities"))
1035            .execute(&self.pool)
1036            .await?;
1037        Ok(())
1038    }
1039
1040    // ── A-MEM Retrieval Tracking ──────────────────────────────────────────────
1041
1042    /// Find entities matching `query` and return them with normalized FTS5 scores.
1043    ///
1044    /// Returns `Vec<(Entity, fts_score)>` where `fts_score` is normalized to `[0.0, 1.0]`
1045    /// by dividing each negated BM25 value by the maximum in the result set.
1046    /// Alias matches receive a fixed score of `0.5` (relative to FTS matches before normalization).
1047    ///
1048    /// Uses `UNION ALL` with outer `ORDER BY` to preserve FTS5 ordering through the LIMIT.
1049    ///
1050    /// # Errors
1051    ///
1052    /// Returns an error if the database query fails.
1053    #[allow(clippy::too_many_lines)]
1054    pub async fn find_entities_ranked(
1055        &self,
1056        query: &str,
1057        limit: usize,
1058    ) -> Result<Vec<(Entity, f32)>, MemoryError> {
1059        // Row type for UNION ALL FTS5 query: (id, name, canonical_name, entity_type,
1060        // summary, first_seen_at, last_seen_at, qdrant_point_id, fts_rank).
1061        type EntityFtsRow = (
1062            i64,
1063            String,
1064            String,
1065            String,
1066            Option<String>,
1067            String,
1068            String,
1069            Option<String>,
1070            f64,
1071        );
1072
1073        const FTS5_OPERATORS: &[&str] = &["AND", "OR", "NOT", "NEAR"];
1074        let query = &query[..query.floor_char_boundary(512)];
1075        let sanitized = sanitize_fts_query(query);
1076        if sanitized.is_empty() {
1077            return Ok(vec![]);
1078        }
1079        let fts_query: String = sanitized
1080            .split_whitespace()
1081            .filter(|t| !FTS5_OPERATORS.contains(t))
1082            .map(|t| format!("{t}*"))
1083            .collect::<Vec<_>>()
1084            .join(" ");
1085        if fts_query.is_empty() {
1086            return Ok(vec![]);
1087        }
1088
1089        let limit_i64 = i64::try_from(limit)?;
1090
1091        // UNION ALL with outer ORDER BY preserves FTS5 BM25 ordering through LIMIT.
1092        // Alias matches get a fixed raw score of 0.5 (below any real BM25 match).
1093        let ranked_fts_sql = format!(
1094            "SELECT * FROM ( \
1095                 SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
1096                        e.first_seen_at, e.last_seen_at, e.qdrant_point_id, \
1097                        -bm25(graph_entities_fts, 10.0, 1.0) AS fts_rank \
1098                 FROM graph_entities_fts fts \
1099                 JOIN graph_entities e ON e.id = fts.rowid \
1100                 WHERE graph_entities_fts MATCH ? \
1101                 UNION ALL \
1102                 SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
1103                        e.first_seen_at, e.last_seen_at, e.qdrant_point_id, \
1104                        0.5 AS fts_rank \
1105                 FROM graph_entity_aliases a \
1106                 JOIN graph_entities e ON e.id = a.entity_id \
1107                 WHERE a.alias_name LIKE ? ESCAPE '\\' {} \
1108             ) \
1109             ORDER BY fts_rank DESC \
1110             LIMIT ?",
1111            <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
1112        );
1113        let rows: Vec<EntityFtsRow> = zeph_db::query_as(&ranked_fts_sql)
1114            .bind(&fts_query)
1115            .bind(format!(
1116                "%{}%",
1117                query
1118                    .trim()
1119                    .replace('\\', "\\\\")
1120                    .replace('%', "\\%")
1121                    .replace('_', "\\_")
1122            ))
1123            .bind(limit_i64)
1124            .fetch_all(&self.pool)
1125            .await?;
1126
1127        if rows.is_empty() {
1128            return Ok(vec![]);
1129        }
1130
1131        // Normalize FTS scores to [0, 1] by dividing by max; guard against div-by-zero.
1132        let max_score: f64 = rows.iter().map(|r| r.8).fold(0.0_f64, f64::max);
1133        let max_score = if max_score <= 0.0 { 1.0 } else { max_score };
1134
1135        // Deduplicate by entity ID (keep first/highest-ranked occurrence).
1136        let mut seen_ids: std::collections::HashSet<i64> = std::collections::HashSet::new();
1137        let mut result: Vec<(Entity, f32)> = Vec::with_capacity(rows.len());
1138        for (
1139            id,
1140            name,
1141            canonical_name,
1142            entity_type_str,
1143            summary,
1144            first_seen_at,
1145            last_seen_at,
1146            qdrant_point_id,
1147            raw_score,
1148        ) in rows
1149        {
1150            if !seen_ids.insert(id) {
1151                continue;
1152            }
1153            let entity_type = entity_type_str
1154                .parse()
1155                .unwrap_or(super::types::EntityType::Concept);
1156            let entity = Entity {
1157                id,
1158                name,
1159                canonical_name,
1160                entity_type,
1161                summary,
1162                first_seen_at,
1163                last_seen_at,
1164                qdrant_point_id,
1165            };
1166            #[allow(clippy::cast_possible_truncation)]
1167            let normalized = (raw_score / max_score).clamp(0.0, 1.0) as f32;
1168            result.push((entity, normalized));
1169        }
1170
1171        Ok(result)
1172    }
1173
1174    /// Compute structural scores (degree + edge type diversity) for a batch of entity IDs.
1175    ///
1176    /// Returns `HashMap<entity_id, structural_score>` where score is in `[0.0, 1.0]`.
1177    /// Formula: `0.6 * (degree / max_degree) + 0.4 * (type_diversity / 4.0)`.
1178    /// Entities with no edges receive score `0.0`.
1179    ///
1180    /// # Errors
1181    ///
1182    /// Returns an error if the database query fails.
1183    pub async fn entity_structural_scores(
1184        &self,
1185        entity_ids: &[i64],
1186    ) -> Result<HashMap<i64, f32>, MemoryError> {
1187        // Each query binds entity_ids three times (three IN clauses).
1188        // Stay safely under SQLite 999-variable limit: 999 / 3 = 333, use 163 for headroom.
1189        const MAX_BATCH: usize = 163;
1190
1191        if entity_ids.is_empty() {
1192            return Ok(HashMap::new());
1193        }
1194
1195        let mut all_rows: Vec<(i64, i64, i64)> = Vec::new();
1196        for chunk in entity_ids.chunks(MAX_BATCH) {
1197            let n = chunk.len();
1198            // Three copies of chunk IDs: positions 1..n, n+1..2n, 2n+1..3n
1199            let ph1 = placeholder_list(1, n);
1200            let ph2 = placeholder_list(n + 1, n);
1201            let ph3 = placeholder_list(n * 2 + 1, n);
1202
1203            // Build query: count degree and distinct edge types for each entity.
1204            let sql = format!(
1205                "SELECT entity_id,
1206                        COUNT(*) AS degree,
1207                        COUNT(DISTINCT edge_type) AS type_diversity
1208                 FROM (
1209                     SELECT source_entity_id AS entity_id, edge_type
1210                     FROM graph_edges
1211                     WHERE valid_to IS NULL AND source_entity_id IN ({ph1})
1212                     UNION ALL
1213                     SELECT target_entity_id AS entity_id, edge_type
1214                     FROM graph_edges
1215                     WHERE valid_to IS NULL AND target_entity_id IN ({ph2})
1216                 )
1217                 WHERE entity_id IN ({ph3})
1218                 GROUP BY entity_id"
1219            );
1220
1221            let mut query = zeph_db::query_as::<_, (i64, i64, i64)>(&sql);
1222            // Bind chunk three times (three IN clauses)
1223            for id in chunk {
1224                query = query.bind(*id);
1225            }
1226            for id in chunk {
1227                query = query.bind(*id);
1228            }
1229            for id in chunk {
1230                query = query.bind(*id);
1231            }
1232
1233            let chunk_rows: Vec<(i64, i64, i64)> = query.fetch_all(&self.pool).await?;
1234            all_rows.extend(chunk_rows);
1235        }
1236
1237        if all_rows.is_empty() {
1238            return Ok(entity_ids.iter().map(|&id| (id, 0.0_f32)).collect());
1239        }
1240
1241        let max_degree = all_rows
1242            .iter()
1243            .map(|(_, d, _)| *d)
1244            .max()
1245            .unwrap_or(1)
1246            .max(1);
1247
1248        let mut scores: HashMap<i64, f32> = entity_ids.iter().map(|&id| (id, 0.0_f32)).collect();
1249        for (entity_id, degree, type_diversity) in all_rows {
1250            #[allow(clippy::cast_precision_loss)]
1251            let norm_degree = degree as f32 / max_degree as f32;
1252            #[allow(clippy::cast_precision_loss)]
1253            let norm_diversity = (type_diversity as f32 / 4.0).min(1.0);
1254            let score = 0.6 * norm_degree + 0.4 * norm_diversity;
1255            scores.insert(entity_id, score);
1256        }
1257
1258        Ok(scores)
1259    }
1260
1261    /// Look up community IDs for a batch of entity IDs.
1262    ///
1263    /// Returns `HashMap<entity_id, community_id>`. Entities not assigned to any community
1264    /// are absent from the map (treated as `None` by callers — no community cap applied).
1265    ///
1266    /// # Errors
1267    ///
1268    /// Returns an error if the database query fails.
1269    pub async fn entity_community_ids(
1270        &self,
1271        entity_ids: &[i64],
1272    ) -> Result<HashMap<i64, i64>, MemoryError> {
1273        const MAX_BATCH: usize = 490;
1274
1275        if entity_ids.is_empty() {
1276            return Ok(HashMap::new());
1277        }
1278
1279        let mut result: HashMap<i64, i64> = HashMap::new();
1280        for chunk in entity_ids.chunks(MAX_BATCH) {
1281            let placeholders = placeholder_list(1, chunk.len());
1282
1283            let community_sql = community_ids_sql(&placeholders);
1284            let mut query = zeph_db::query_as::<_, (i64, i64)>(&community_sql);
1285            for id in chunk {
1286                query = query.bind(*id);
1287            }
1288
1289            let rows: Vec<(i64, i64)> = query.fetch_all(&self.pool).await?;
1290            result.extend(rows);
1291        }
1292
1293        Ok(result)
1294    }
1295
1296    /// Increment `retrieval_count` and set `last_retrieved_at` for a batch of edge IDs.
1297    ///
1298    /// Fire-and-forget: errors are logged but not propagated. Caller should log the warning.
1299    /// Batched with `MAX_BATCH = 490` to stay safely under `SQLite` bind variable limit.
1300    ///
1301    /// # Errors
1302    ///
1303    /// Returns an error if the database query fails.
1304    pub async fn record_edge_retrieval(&self, edge_ids: &[i64]) -> Result<(), MemoryError> {
1305        const MAX_BATCH: usize = 490;
1306        let epoch_now = <ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1307        for chunk in edge_ids.chunks(MAX_BATCH) {
1308            let edge_placeholders = placeholder_list(1, chunk.len());
1309            let retrieval_sql = format!(
1310                "UPDATE graph_edges \
1311                 SET retrieval_count = retrieval_count + 1, \
1312                     last_retrieved_at = {epoch_now} \
1313                 WHERE id IN ({edge_placeholders})"
1314            );
1315            let mut q = zeph_db::query(&retrieval_sql);
1316            for id in chunk {
1317                q = q.bind(*id);
1318            }
1319            q.execute(&self.pool).await?;
1320        }
1321        Ok(())
1322    }
1323
1324    /// Apply multiplicative decay to `retrieval_count` for un-retrieved active edges.
1325    ///
1326    /// Only edges with `retrieval_count > 0` and `last_retrieved_at < (now - interval_secs)`
1327    /// are updated. Returns the number of rows affected.
1328    ///
1329    /// # Errors
1330    ///
1331    /// Returns an error if the database query fails.
1332    pub async fn decay_edge_retrieval_counts(
1333        &self,
1334        decay_lambda: f64,
1335        interval_secs: u64,
1336    ) -> Result<usize, MemoryError> {
1337        let epoch_now_decay = <ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1338        let decay_raw = format!(
1339            "UPDATE graph_edges \
1340             SET retrieval_count = MAX(CAST(retrieval_count * ? AS INTEGER), 0) \
1341             WHERE valid_to IS NULL \
1342               AND retrieval_count > 0 \
1343               AND (last_retrieved_at IS NULL OR last_retrieved_at < {epoch_now_decay} - ?)"
1344        );
1345        let decay_sql = zeph_db::rewrite_placeholders(&decay_raw);
1346        let result = zeph_db::query(&decay_sql)
1347            .bind(decay_lambda)
1348            .bind(i64::try_from(interval_secs).unwrap_or(i64::MAX))
1349            .execute(&self.pool)
1350            .await?;
1351        Ok(usize::try_from(result.rows_affected())?)
1352    }
1353
1354    /// Delete expired edges older than `retention_days` and return count deleted.
1355    ///
1356    /// # Errors
1357    ///
1358    /// Returns an error if the database query fails.
1359    pub async fn delete_expired_edges(&self, retention_days: u32) -> Result<usize, MemoryError> {
1360        let days = i64::from(retention_days);
1361        let result = zeph_db::query(sql!(
1362            "DELETE FROM graph_edges
1363             WHERE expired_at IS NOT NULL
1364               AND expired_at < datetime('now', '-' || ? || ' days')"
1365        ))
1366        .bind(days)
1367        .execute(&self.pool)
1368        .await?;
1369        Ok(usize::try_from(result.rows_affected())?)
1370    }
1371
1372    /// Delete orphan entities (no active edges, last seen more than `retention_days` ago).
1373    ///
1374    /// # Errors
1375    ///
1376    /// Returns an error if the database query fails.
1377    pub async fn delete_orphan_entities(&self, retention_days: u32) -> Result<usize, MemoryError> {
1378        let days = i64::from(retention_days);
1379        let result = zeph_db::query(sql!(
1380            "DELETE FROM graph_entities
1381             WHERE id NOT IN (
1382                 SELECT DISTINCT source_entity_id FROM graph_edges WHERE valid_to IS NULL
1383                 UNION
1384                 SELECT DISTINCT target_entity_id FROM graph_edges WHERE valid_to IS NULL
1385             )
1386             AND last_seen_at < datetime('now', '-' || ? || ' days')"
1387        ))
1388        .bind(days)
1389        .execute(&self.pool)
1390        .await?;
1391        Ok(usize::try_from(result.rows_affected())?)
1392    }
1393
1394    /// Delete the oldest excess entities when count exceeds `max_entities`.
1395    ///
1396    /// Entities are ranked by ascending edge count, then ascending `last_seen_at` (LRU).
1397    /// Only deletes when `entity_count() > max_entities`.
1398    ///
1399    /// # Errors
1400    ///
1401    /// Returns an error if the database query fails.
1402    pub async fn cap_entities(&self, max_entities: usize) -> Result<usize, MemoryError> {
1403        let current = self.entity_count().await?;
1404        let max = i64::try_from(max_entities)?;
1405        if current <= max {
1406            return Ok(0);
1407        }
1408        let excess = current - max;
1409        let result = zeph_db::query(sql!(
1410            "DELETE FROM graph_entities
1411             WHERE id IN (
1412                 SELECT e.id
1413                 FROM graph_entities e
1414                 LEFT JOIN (
1415                     SELECT source_entity_id AS eid, COUNT(*) AS cnt
1416                     FROM graph_edges WHERE valid_to IS NULL GROUP BY source_entity_id
1417                     UNION ALL
1418                     SELECT target_entity_id AS eid, COUNT(*) AS cnt
1419                     FROM graph_edges WHERE valid_to IS NULL GROUP BY target_entity_id
1420                 ) edge_counts ON e.id = edge_counts.eid
1421                 ORDER BY COALESCE(edge_counts.cnt, 0) ASC, e.last_seen_at ASC
1422                 LIMIT ?
1423             )"
1424        ))
1425        .bind(excess)
1426        .execute(&self.pool)
1427        .await?;
1428        Ok(usize::try_from(result.rows_affected())?)
1429    }
1430
1431    // ── Temporal Edge Queries ─────────────────────────────────────────────────
1432
1433    /// Return all edges for `entity_id` (as source or target) that were valid at `timestamp`.
1434    ///
1435    /// An edge is valid at `timestamp` when:
1436    /// - `valid_from <= timestamp`, AND
1437    /// - `valid_to IS NULL` (open-ended) OR `valid_to > timestamp`.
1438    ///
1439    /// `timestamp` must be a `SQLite` datetime string: `"YYYY-MM-DD HH:MM:SS"`.
1440    ///
1441    /// # Errors
1442    ///
1443    /// Returns an error if the database query fails.
1444    pub async fn edges_at_timestamp(
1445        &self,
1446        entity_id: i64,
1447        timestamp: &str,
1448    ) -> Result<Vec<Edge>, MemoryError> {
1449        // Split into two UNIONed branches to leverage the partial indexes from migration 030:
1450        //   Branch 1 (active edges):     idx_graph_edges_valid + idx_graph_edges_source/target
1451        //   Branch 2 (historical edges): idx_graph_edges_src_temporal / idx_graph_edges_tgt_temporal
1452        let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
1453            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1454                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1455                    edge_type, retrieval_count, last_retrieved_at, superseded_by
1456             FROM graph_edges
1457             WHERE valid_to IS NULL
1458               AND valid_from <= ?
1459               AND (source_entity_id = ? OR target_entity_id = ?)
1460             UNION ALL
1461             SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1462                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1463                    edge_type, retrieval_count, last_retrieved_at, superseded_by
1464             FROM graph_edges
1465             WHERE valid_to IS NOT NULL
1466               AND valid_from <= ?
1467               AND valid_to > ?
1468               AND (source_entity_id = ? OR target_entity_id = ?)"
1469        ))
1470        .bind(timestamp)
1471        .bind(entity_id)
1472        .bind(entity_id)
1473        .bind(timestamp)
1474        .bind(timestamp)
1475        .bind(entity_id)
1476        .bind(entity_id)
1477        .fetch_all(&self.pool)
1478        .await?;
1479        Ok(rows.into_iter().map(edge_from_row).collect())
1480    }
1481
1482    /// Return all edge versions (active and expired) for the given `(source, predicate)` pair.
1483    ///
1484    /// The optional `relation` filter restricts results to a specific relation label.
1485    /// Results are ordered by `valid_from DESC` (most recent first).
1486    ///
1487    /// # Errors
1488    ///
1489    /// Returns an error if the database query fails.
1490    pub async fn edge_history(
1491        &self,
1492        source_entity_id: i64,
1493        predicate: &str,
1494        relation: Option<&str>,
1495        limit: usize,
1496    ) -> Result<Vec<Edge>, MemoryError> {
1497        // Escape LIKE wildcards so `%` and `_` in the predicate are treated as literals.
1498        let escaped = predicate
1499            .replace('\\', "\\\\")
1500            .replace('%', "\\%")
1501            .replace('_', "\\_");
1502        let like_pattern = format!("%{escaped}%");
1503        let limit = i64::try_from(limit)?;
1504        let rows: Vec<EdgeRow> = if let Some(rel) = relation {
1505            zeph_db::query_as(sql!(
1506                "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1507                        valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1508                        edge_type, retrieval_count, last_retrieved_at, superseded_by, superseded_by
1509                 FROM graph_edges
1510                 WHERE source_entity_id = ?
1511                   AND fact LIKE ? ESCAPE '\\'
1512                   AND relation = ?
1513                 ORDER BY valid_from DESC
1514                 LIMIT ?"
1515            ))
1516            .bind(source_entity_id)
1517            .bind(&like_pattern)
1518            .bind(rel)
1519            .bind(limit)
1520            .fetch_all(&self.pool)
1521            .await?
1522        } else {
1523            zeph_db::query_as(sql!(
1524                "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1525                        valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1526                        edge_type, retrieval_count, last_retrieved_at, superseded_by, superseded_by
1527                 FROM graph_edges
1528                 WHERE source_entity_id = ?
1529                   AND fact LIKE ? ESCAPE '\\'
1530                 ORDER BY valid_from DESC
1531                 LIMIT ?"
1532            ))
1533            .bind(source_entity_id)
1534            .bind(&like_pattern)
1535            .bind(limit)
1536            .fetch_all(&self.pool)
1537            .await?
1538        };
1539        Ok(rows.into_iter().map(edge_from_row).collect())
1540    }
1541
1542    // ── BFS Traversal ─────────────────────────────────────────────────────────
1543
1544    /// Breadth-first traversal from `start_entity_id` up to `max_hops` hops.
1545    ///
1546    /// Returns all reachable entities and the active edges connecting them.
1547    /// Implements BFS iteratively in Rust to guarantee cycle safety regardless
1548    /// of `SQLite` CTE limitations.
1549    ///
1550    /// **`SQLite` bind parameter limit**: each BFS hop binds the frontier IDs three times in the
1551    /// neighbour query. At ~300+ frontier entities per hop, the IN clause may approach `SQLite`'s
1552    /// default `SQLITE_MAX_VARIABLE_NUMBER` limit of 999. Acceptable for Phase 1 (small graphs,
1553    /// `max_hops` typically 2–3). For large graphs, consider batching or a temp-table approach.
1554    ///
1555    /// # Errors
1556    ///
1557    /// Returns an error if any database query fails.
1558    pub async fn bfs(
1559        &self,
1560        start_entity_id: i64,
1561        max_hops: u32,
1562    ) -> Result<(Vec<Entity>, Vec<Edge>), MemoryError> {
1563        self.bfs_with_depth(start_entity_id, max_hops)
1564            .await
1565            .map(|(e, ed, _)| (e, ed))
1566    }
1567
1568    /// BFS traversal returning entities, edges, and a depth map (`entity_id` → hop distance).
1569    ///
1570    /// The depth map records the minimum hop distance from `start_entity_id` to each visited
1571    /// entity. The start entity itself has depth 0.
1572    ///
1573    /// **`SQLite` bind parameter limit**: see [`bfs`] for notes on frontier size limits.
1574    ///
1575    /// # Errors
1576    ///
1577    /// Returns an error if any database query fails.
1578    pub async fn bfs_with_depth(
1579        &self,
1580        start_entity_id: i64,
1581        max_hops: u32,
1582    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1583        self.bfs_core(start_entity_id, max_hops, None).await
1584    }
1585
1586    /// BFS traversal considering only edges that were valid at `timestamp`.
1587    ///
1588    /// Equivalent to [`bfs_with_depth`] but replaces the `valid_to IS NULL` filter with
1589    /// the temporal range predicate `valid_from <= ts AND (valid_to IS NULL OR valid_to > ts)`.
1590    ///
1591    /// `timestamp` must be a `SQLite` datetime string: `"YYYY-MM-DD HH:MM:SS"`.
1592    ///
1593    /// # Errors
1594    ///
1595    /// Returns an error if any database query fails.
1596    pub async fn bfs_at_timestamp(
1597        &self,
1598        start_entity_id: i64,
1599        max_hops: u32,
1600        timestamp: &str,
1601    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1602        self.bfs_core(start_entity_id, max_hops, Some(timestamp))
1603            .await
1604    }
1605
1606    /// BFS traversal scoped to specific MAGMA edge types.
1607    ///
1608    /// When `edge_types` is empty, behaves identically to [`bfs_with_depth`] (traverses all
1609    /// active edges). When `edge_types` is non-empty, only traverses edges whose `edge_type`
1610    /// matches one of the provided types.
1611    ///
1612    /// This enables subgraph-scoped retrieval: a causal query traverses only causal + semantic
1613    /// edges, a temporal query only temporal + semantic edges, etc.
1614    ///
1615    /// Note: Semantic is typically included in `edge_types` by the caller to ensure recall is
1616    /// never worse than the untyped BFS. See `classify_graph_subgraph` in `router.rs`.
1617    ///
1618    /// # Errors
1619    ///
1620    /// Returns an error if any database query fails.
1621    pub async fn bfs_typed(
1622        &self,
1623        start_entity_id: i64,
1624        max_hops: u32,
1625        edge_types: &[EdgeType],
1626    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1627        if edge_types.is_empty() {
1628            return self.bfs_with_depth(start_entity_id, max_hops).await;
1629        }
1630        self.bfs_core_typed(start_entity_id, max_hops, None, edge_types)
1631            .await
1632    }
1633
1634    /// Shared BFS implementation.
1635    ///
1636    /// When `at_timestamp` is `None`, only active edges (`valid_to IS NULL`) are traversed.
1637    /// When `at_timestamp` is `Some(ts)`, edges valid at `ts` are traversed (temporal BFS).
1638    ///
1639    /// All IDs used in dynamic SQL come from our own database — no user input reaches the
1640    /// format string, so there is no SQL injection risk.
1641    async fn bfs_core(
1642        &self,
1643        start_entity_id: i64,
1644        max_hops: u32,
1645        at_timestamp: Option<&str>,
1646    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1647        use std::collections::HashMap;
1648
1649        // SQLite binds frontier IDs 3× per hop; at >333 IDs the IN clause exceeds
1650        // SQLITE_MAX_VARIABLE_NUMBER (999). Cap to 300 to stay safely within the limit.
1651        const MAX_FRONTIER: usize = 300;
1652
1653        let mut depth_map: HashMap<i64, u32> = HashMap::new();
1654        let mut frontier: Vec<i64> = vec![start_entity_id];
1655        depth_map.insert(start_entity_id, 0);
1656
1657        for hop in 0..max_hops {
1658            if frontier.is_empty() {
1659                break;
1660            }
1661            frontier.truncate(MAX_FRONTIER);
1662            // IDs come from our own DB — no user input, no injection risk.
1663            // Three copies of frontier IDs: positions 1..n, n+1..2n, 2n+1..3n.
1664            // Timestamp (if any) follows at position 3n+1.
1665            let n = frontier.len();
1666            let ph1 = placeholder_list(1, n);
1667            let ph2 = placeholder_list(n + 1, n);
1668            let ph3 = placeholder_list(n * 2 + 1, n);
1669            let edge_filter = if at_timestamp.is_some() {
1670                let ts_pos = n * 3 + 1;
1671                format!(
1672                    "valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1673                    ts = numbered_placeholder(ts_pos),
1674                )
1675            } else {
1676                "valid_to IS NULL".to_owned()
1677            };
1678            let neighbour_sql = format!(
1679                "SELECT DISTINCT CASE
1680                     WHEN source_entity_id IN ({ph1}) THEN target_entity_id
1681                     ELSE source_entity_id
1682                 END as neighbour_id
1683                 FROM graph_edges
1684                 WHERE {edge_filter}
1685                   AND (source_entity_id IN ({ph2}) OR target_entity_id IN ({ph3}))"
1686            );
1687            let mut q = zeph_db::query_scalar::<_, i64>(&neighbour_sql);
1688            for id in &frontier {
1689                q = q.bind(*id);
1690            }
1691            for id in &frontier {
1692                q = q.bind(*id);
1693            }
1694            for id in &frontier {
1695                q = q.bind(*id);
1696            }
1697            if let Some(ts) = at_timestamp {
1698                q = q.bind(ts);
1699            }
1700            let neighbours: Vec<i64> = q.fetch_all(&self.pool).await?;
1701            let mut next_frontier: Vec<i64> = Vec::new();
1702            for nbr in neighbours {
1703                if let std::collections::hash_map::Entry::Vacant(e) = depth_map.entry(nbr) {
1704                    e.insert(hop + 1);
1705                    next_frontier.push(nbr);
1706                }
1707            }
1708            frontier = next_frontier;
1709        }
1710
1711        self.bfs_fetch_results(depth_map, at_timestamp).await
1712    }
1713
1714    /// BFS implementation scoped to specific edge types.
1715    ///
1716    /// Builds the IN clause for `edge_type` filtering dynamically from enum values.
1717    /// All enum-derived strings come from `EdgeType::as_str()` — no user input reaches SQL.
1718    ///
1719    /// # Errors
1720    ///
1721    /// Returns an error if any database query fails.
1722    async fn bfs_core_typed(
1723        &self,
1724        start_entity_id: i64,
1725        max_hops: u32,
1726        at_timestamp: Option<&str>,
1727        edge_types: &[EdgeType],
1728    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1729        use std::collections::HashMap;
1730
1731        const MAX_FRONTIER: usize = 300;
1732
1733        let type_strs: Vec<&str> = edge_types.iter().map(|t| t.as_str()).collect();
1734
1735        let mut depth_map: HashMap<i64, u32> = HashMap::new();
1736        let mut frontier: Vec<i64> = vec![start_entity_id];
1737        depth_map.insert(start_entity_id, 0);
1738
1739        let n_types = type_strs.len();
1740        // type_in is constant for the entire BFS — positions 1..=n_types never change.
1741        let type_in = placeholder_list(1, n_types);
1742        let id_start = n_types + 1;
1743
1744        for hop in 0..max_hops {
1745            if frontier.is_empty() {
1746                break;
1747            }
1748            frontier.truncate(MAX_FRONTIER);
1749
1750            let n_frontier = frontier.len();
1751            // Positions: types first (1..n_types), then 3 copies of frontier IDs.
1752            let fp1 = placeholder_list(id_start, n_frontier);
1753            let fp2 = placeholder_list(id_start + n_frontier, n_frontier);
1754            let fp3 = placeholder_list(id_start + n_frontier * 2, n_frontier);
1755
1756            let edge_filter = if at_timestamp.is_some() {
1757                let ts_pos = id_start + n_frontier * 3;
1758                format!(
1759                    "edge_type IN ({type_in}) AND valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1760                    ts = numbered_placeholder(ts_pos),
1761                )
1762            } else {
1763                format!("edge_type IN ({type_in}) AND valid_to IS NULL")
1764            };
1765
1766            let neighbour_sql = format!(
1767                "SELECT DISTINCT CASE
1768                     WHEN source_entity_id IN ({fp1}) THEN target_entity_id
1769                     ELSE source_entity_id
1770                 END as neighbour_id
1771                 FROM graph_edges
1772                 WHERE {edge_filter}
1773                   AND (source_entity_id IN ({fp2}) OR target_entity_id IN ({fp3}))"
1774            );
1775
1776            let mut q = zeph_db::query_scalar::<_, i64>(&neighbour_sql);
1777            // Bind types first
1778            for t in &type_strs {
1779                q = q.bind(*t);
1780            }
1781            // Bind frontier 3 times
1782            for id in &frontier {
1783                q = q.bind(*id);
1784            }
1785            for id in &frontier {
1786                q = q.bind(*id);
1787            }
1788            for id in &frontier {
1789                q = q.bind(*id);
1790            }
1791            if let Some(ts) = at_timestamp {
1792                q = q.bind(ts);
1793            }
1794
1795            let neighbours: Vec<i64> = q.fetch_all(&self.pool).await?;
1796            let mut next_frontier: Vec<i64> = Vec::new();
1797            for nbr in neighbours {
1798                if let std::collections::hash_map::Entry::Vacant(e) = depth_map.entry(nbr) {
1799                    e.insert(hop + 1);
1800                    next_frontier.push(nbr);
1801                }
1802            }
1803            frontier = next_frontier;
1804        }
1805
1806        // Fetch results — pass edge_type filter to bfs_fetch_results_typed
1807        self.bfs_fetch_results_typed(depth_map, at_timestamp, &type_strs)
1808            .await
1809    }
1810
1811    /// Fetch entities and typed edges for a completed BFS depth map.
1812    ///
1813    /// Filters returned edges by the provided `edge_type` strings.
1814    ///
1815    /// # Errors
1816    ///
1817    /// Returns an error if any database query fails.
1818    async fn bfs_fetch_results_typed(
1819        &self,
1820        depth_map: std::collections::HashMap<i64, u32>,
1821        at_timestamp: Option<&str>,
1822        type_strs: &[&str],
1823    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1824        let mut visited_ids: Vec<i64> = depth_map.keys().copied().collect();
1825        if visited_ids.is_empty() {
1826            return Ok((Vec::new(), Vec::new(), depth_map));
1827        }
1828        if visited_ids.len() > 499 {
1829            tracing::warn!(
1830                total = visited_ids.len(),
1831                retained = 499,
1832                "bfs_fetch_results_typed: visited entity set truncated to 499"
1833            );
1834            visited_ids.truncate(499);
1835        }
1836
1837        let n_types = type_strs.len();
1838        let n_visited = visited_ids.len();
1839
1840        // Bind order: types first (1..=n_types), then visited_ids twice, then optional timestamp.
1841        let type_in = placeholder_list(1, n_types);
1842        let id_start = n_types + 1;
1843        let ph_ids1 = placeholder_list(id_start, n_visited);
1844        let ph_ids2 = placeholder_list(id_start + n_visited, n_visited);
1845
1846        let edge_filter = if at_timestamp.is_some() {
1847            let ts_pos = id_start + n_visited * 2;
1848            format!(
1849                "edge_type IN ({type_in}) AND valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1850                ts = numbered_placeholder(ts_pos),
1851            )
1852        } else {
1853            format!("edge_type IN ({type_in}) AND valid_to IS NULL")
1854        };
1855
1856        let edge_sql = format!(
1857            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1858                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1859                    edge_type, retrieval_count, last_retrieved_at, superseded_by
1860             FROM graph_edges
1861             WHERE {edge_filter}
1862               AND source_entity_id IN ({ph_ids1})
1863               AND target_entity_id IN ({ph_ids2})"
1864        );
1865        let mut edge_query = zeph_db::query_as::<_, EdgeRow>(&edge_sql);
1866        for t in type_strs {
1867            edge_query = edge_query.bind(*t);
1868        }
1869        for id in &visited_ids {
1870            edge_query = edge_query.bind(*id);
1871        }
1872        for id in &visited_ids {
1873            edge_query = edge_query.bind(*id);
1874        }
1875        if let Some(ts) = at_timestamp {
1876            edge_query = edge_query.bind(ts);
1877        }
1878        let edge_rows: Vec<EdgeRow> = edge_query.fetch_all(&self.pool).await?;
1879
1880        // For entity query, use plain sequential bind positions (no type prefix offset)
1881        let entity_sql2 = format!(
1882            "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
1883             FROM graph_entities WHERE id IN ({ph})",
1884            ph = placeholder_list(1, visited_ids.len()),
1885        );
1886        let mut entity_query = zeph_db::query_as::<_, EntityRow>(&entity_sql2);
1887        for id in &visited_ids {
1888            entity_query = entity_query.bind(*id);
1889        }
1890        let entity_rows: Vec<EntityRow> = entity_query.fetch_all(&self.pool).await?;
1891
1892        let entities: Vec<Entity> = entity_rows
1893            .into_iter()
1894            .map(entity_from_row)
1895            .collect::<Result<Vec<_>, _>>()?;
1896        let edges: Vec<Edge> = edge_rows.into_iter().map(edge_from_row).collect();
1897
1898        Ok((entities, edges, depth_map))
1899    }
1900
1901    /// Fetch entities and edges for a completed BFS depth map.
1902    async fn bfs_fetch_results(
1903        &self,
1904        depth_map: std::collections::HashMap<i64, u32>,
1905        at_timestamp: Option<&str>,
1906    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1907        let mut visited_ids: Vec<i64> = depth_map.keys().copied().collect();
1908        if visited_ids.is_empty() {
1909            return Ok((Vec::new(), Vec::new(), depth_map));
1910        }
1911        // Edge query binds visited_ids twice — cap at 499 to stay under SQLite 999 limit.
1912        if visited_ids.len() > 499 {
1913            tracing::warn!(
1914                total = visited_ids.len(),
1915                retained = 499,
1916                "bfs_fetch_results: visited entity set truncated to 499 to stay within SQLite bind limit; \
1917                 some reachable entities will be dropped from results"
1918            );
1919            visited_ids.truncate(499);
1920        }
1921
1922        let n = visited_ids.len();
1923        let ph_ids1 = placeholder_list(1, n);
1924        let ph_ids2 = placeholder_list(n + 1, n);
1925        let edge_filter = if at_timestamp.is_some() {
1926            let ts_pos = n * 2 + 1;
1927            format!(
1928                "valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1929                ts = numbered_placeholder(ts_pos),
1930            )
1931        } else {
1932            "valid_to IS NULL".to_owned()
1933        };
1934        let edge_sql = format!(
1935            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1936                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1937                    edge_type, retrieval_count, last_retrieved_at, superseded_by
1938             FROM graph_edges
1939             WHERE {edge_filter}
1940               AND source_entity_id IN ({ph_ids1})
1941               AND target_entity_id IN ({ph_ids2})"
1942        );
1943        let mut edge_query = zeph_db::query_as::<_, EdgeRow>(&edge_sql);
1944        for id in &visited_ids {
1945            edge_query = edge_query.bind(*id);
1946        }
1947        for id in &visited_ids {
1948            edge_query = edge_query.bind(*id);
1949        }
1950        if let Some(ts) = at_timestamp {
1951            edge_query = edge_query.bind(ts);
1952        }
1953        let edge_rows: Vec<EdgeRow> = edge_query.fetch_all(&self.pool).await?;
1954
1955        let entity_sql = format!(
1956            "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
1957             FROM graph_entities WHERE id IN ({ph})",
1958            ph = placeholder_list(1, n),
1959        );
1960        let mut entity_query = zeph_db::query_as::<_, EntityRow>(&entity_sql);
1961        for id in &visited_ids {
1962            entity_query = entity_query.bind(*id);
1963        }
1964        let entity_rows: Vec<EntityRow> = entity_query.fetch_all(&self.pool).await?;
1965
1966        let entities: Vec<Entity> = entity_rows
1967            .into_iter()
1968            .map(entity_from_row)
1969            .collect::<Result<Vec<_>, _>>()?;
1970        let edges: Vec<Edge> = edge_rows.into_iter().map(edge_from_row).collect();
1971
1972        Ok((entities, edges, depth_map))
1973    }
1974
1975    // ── Backfill helpers ──────────────────────────────────────────────────────
1976
1977    /// Find an entity by name only (no type filter).
1978    ///
1979    /// Uses a two-phase lookup to ensure exact name matches are always prioritised:
1980    /// 1. Exact case-insensitive match on `name` or `canonical_name`.
1981    /// 2. If no exact match found, falls back to FTS5 prefix search (see `find_entities_fuzzy`).
1982    ///
1983    /// This prevents FTS5 from returning a different entity whose *summary* mentions the
1984    /// searched name (e.g. searching "Alice" returning "Google" because Google's summary
1985    /// contains "Alice").
1986    ///
1987    /// # Errors
1988    ///
1989    /// Returns an error if the database query fails.
1990    pub async fn find_entity_by_name(&self, name: &str) -> Result<Vec<Entity>, MemoryError> {
1991        let find_by_name_sql = format!(
1992            "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id \
1993             FROM graph_entities \
1994             WHERE name = ? {cn} OR canonical_name = ? {cn} \
1995             LIMIT 5",
1996            cn = <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
1997        );
1998        let rows: Vec<EntityRow> = zeph_db::query_as(&find_by_name_sql)
1999            .bind(name)
2000            .bind(name)
2001            .fetch_all(&self.pool)
2002            .await?;
2003
2004        if !rows.is_empty() {
2005            return rows.into_iter().map(entity_from_row).collect();
2006        }
2007
2008        self.find_entities_fuzzy(name, 5).await
2009    }
2010
2011    /// Return up to `limit` messages that have not yet been processed by graph extraction.
2012    ///
2013    /// Reads the `graph_processed` column added by migration 021.
2014    ///
2015    /// # Errors
2016    ///
2017    /// Returns an error if the database query fails.
2018    pub async fn unprocessed_messages_for_backfill(
2019        &self,
2020        limit: usize,
2021    ) -> Result<Vec<(crate::types::MessageId, String)>, MemoryError> {
2022        let limit = i64::try_from(limit)?;
2023        let rows: Vec<(i64, String)> = zeph_db::query_as(sql!(
2024            "SELECT id, content FROM messages
2025             WHERE graph_processed = 0
2026             ORDER BY id ASC
2027             LIMIT ?"
2028        ))
2029        .bind(limit)
2030        .fetch_all(&self.pool)
2031        .await?;
2032        Ok(rows
2033            .into_iter()
2034            .map(|(id, content)| (crate::types::MessageId(id), content))
2035            .collect())
2036    }
2037
2038    /// Return the count of messages not yet processed by graph extraction.
2039    ///
2040    /// # Errors
2041    ///
2042    /// Returns an error if the database query fails.
2043    pub async fn unprocessed_message_count(&self) -> Result<i64, MemoryError> {
2044        let count: i64 = zeph_db::query_scalar(sql!(
2045            "SELECT COUNT(*) FROM messages WHERE graph_processed = 0"
2046        ))
2047        .fetch_one(&self.pool)
2048        .await?;
2049        Ok(count)
2050    }
2051
2052    /// Mark a batch of messages as graph-processed.
2053    ///
2054    /// # Errors
2055    ///
2056    /// Returns an error if the database query fails.
2057    pub async fn mark_messages_graph_processed(
2058        &self,
2059        ids: &[crate::types::MessageId],
2060    ) -> Result<(), MemoryError> {
2061        const MAX_BATCH: usize = 490;
2062        if ids.is_empty() {
2063            return Ok(());
2064        }
2065        for chunk in ids.chunks(MAX_BATCH) {
2066            let placeholders = placeholder_list(1, chunk.len());
2067            let sql =
2068                format!("UPDATE messages SET graph_processed = 1 WHERE id IN ({placeholders})");
2069            let mut query = zeph_db::query(&sql);
2070            for id in chunk {
2071                query = query.bind(id.0);
2072            }
2073            query.execute(&self.pool).await?;
2074        }
2075        Ok(())
2076    }
2077}
2078
2079// ── Dialect helpers ───────────────────────────────────────────────────────────
2080
2081#[cfg(feature = "sqlite")]
2082fn community_ids_sql(placeholders: &str) -> String {
2083    format!(
2084        "SELECT CAST(j.value AS INTEGER) AS entity_id, c.id AS community_id
2085         FROM graph_communities c, json_each(c.entity_ids) j
2086         WHERE CAST(j.value AS INTEGER) IN ({placeholders})"
2087    )
2088}
2089
2090#[cfg(feature = "postgres")]
2091fn community_ids_sql(placeholders: &str) -> String {
2092    format!(
2093        "SELECT (j.value)::bigint AS entity_id, c.id AS community_id
2094         FROM graph_communities c,
2095              jsonb_array_elements_text(c.entity_ids::jsonb) j(value)
2096         WHERE (j.value)::bigint IN ({placeholders})"
2097    )
2098}
2099
2100// ── Row types for zeph_db::query_as ─────────────────────────────────────────────
2101
2102#[derive(zeph_db::FromRow)]
2103struct EntityRow {
2104    id: i64,
2105    name: String,
2106    canonical_name: String,
2107    entity_type: String,
2108    summary: Option<String>,
2109    first_seen_at: String,
2110    last_seen_at: String,
2111    qdrant_point_id: Option<String>,
2112}
2113
2114fn entity_from_row(row: EntityRow) -> Result<Entity, MemoryError> {
2115    let entity_type = row
2116        .entity_type
2117        .parse::<EntityType>()
2118        .map_err(MemoryError::GraphStore)?;
2119    Ok(Entity {
2120        id: row.id,
2121        name: row.name,
2122        canonical_name: row.canonical_name,
2123        entity_type,
2124        summary: row.summary,
2125        first_seen_at: row.first_seen_at,
2126        last_seen_at: row.last_seen_at,
2127        qdrant_point_id: row.qdrant_point_id,
2128    })
2129}
2130
2131#[derive(zeph_db::FromRow)]
2132struct AliasRow {
2133    id: i64,
2134    entity_id: i64,
2135    alias_name: String,
2136    created_at: String,
2137}
2138
2139fn alias_from_row(row: AliasRow) -> EntityAlias {
2140    EntityAlias {
2141        id: row.id,
2142        entity_id: row.entity_id,
2143        alias_name: row.alias_name,
2144        created_at: row.created_at,
2145    }
2146}
2147
2148#[derive(zeph_db::FromRow)]
2149struct EdgeRow {
2150    id: i64,
2151    source_entity_id: i64,
2152    target_entity_id: i64,
2153    relation: String,
2154    fact: String,
2155    confidence: f64,
2156    valid_from: String,
2157    valid_to: Option<String>,
2158    created_at: String,
2159    expired_at: Option<String>,
2160    episode_id: Option<i64>,
2161    qdrant_point_id: Option<String>,
2162    edge_type: String,
2163    retrieval_count: i32,
2164    last_retrieved_at: Option<i64>,
2165    superseded_by: Option<i64>,
2166}
2167
2168fn edge_from_row(row: EdgeRow) -> Edge {
2169    let edge_type = row
2170        .edge_type
2171        .parse::<EdgeType>()
2172        .unwrap_or(EdgeType::Semantic);
2173    Edge {
2174        id: row.id,
2175        source_entity_id: row.source_entity_id,
2176        target_entity_id: row.target_entity_id,
2177        relation: row.relation,
2178        fact: row.fact,
2179        #[allow(clippy::cast_possible_truncation)]
2180        confidence: row.confidence as f32,
2181        valid_from: row.valid_from,
2182        valid_to: row.valid_to,
2183        created_at: row.created_at,
2184        expired_at: row.expired_at,
2185        episode_id: row.episode_id.map(MessageId),
2186        qdrant_point_id: row.qdrant_point_id,
2187        edge_type,
2188        retrieval_count: row.retrieval_count,
2189        last_retrieved_at: row.last_retrieved_at,
2190        superseded_by: row.superseded_by,
2191    }
2192}
2193
2194#[derive(zeph_db::FromRow)]
2195struct CommunityRow {
2196    id: i64,
2197    name: String,
2198    summary: String,
2199    entity_ids: String,
2200    fingerprint: Option<String>,
2201    created_at: String,
2202    updated_at: String,
2203}
2204
2205// ── Tests ─────────────────────────────────────────────────────────────────────
2206
2207#[cfg(test)]
2208mod tests;