Skip to main content

zeph_memory/graph/store/
mod.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! SQLite-backed knowledge graph store.
5//!
6//! [`GraphStore`] manages the `graph_entities`, `graph_edges`, `graph_entity_aliases`,
7//! `graph_communities`, and `graph_episodes` tables. It is the persistence layer for
8//! the MAGMA / GAAMA graph subsystem.
9
10use std::collections::HashMap;
11#[allow(unused_imports)]
12use zeph_db::sql;
13
14use futures::Stream;
15use zeph_db::fts::sanitize_fts_query;
16use zeph_db::{ActiveDialect, DbPool, numbered_placeholder, placeholder_list};
17
18use crate::error::MemoryError;
19use crate::types::MessageId;
20
21use super::types::{Community, Edge, EdgeType, Entity, EntityAlias, EntityType};
22
23/// SQLite-backed persistence layer for the knowledge graph.
24///
25/// All graph mutations go through this type: entity upserts, edge creation,
26/// alias recording, community assignment, and episode management.
27///
28/// Obtain an instance via [`GraphStore::new`] after running the `zeph-db` migrations.
29pub struct GraphStore {
30    pool: DbPool,
31}
32
33impl GraphStore {
34    /// Create a new `GraphStore` wrapping `pool`.
35    ///
36    /// The pool must come from a database with the `zeph-db` migrations applied.
37    #[must_use]
38    pub fn new(pool: DbPool) -> Self {
39        Self { pool }
40    }
41
42    /// Access the underlying database pool.
43    #[must_use]
44    pub fn pool(&self) -> &DbPool {
45        &self.pool
46    }
47
48    // ── Entities ─────────────────────────────────────────────────────────────
49
50    /// Insert or update an entity by `(canonical_name, entity_type)`.
51    ///
52    /// - `surface_name`: the original display form (e.g. `"Rust"`) — stored in the `name` column
53    ///   so user-facing output preserves casing. Updated on every upsert to the latest seen form.
54    /// - `canonical_name`: the stable normalized key (e.g. `"rust"`) — used for deduplication.
55    /// - `summary`: pass `None` to preserve the existing summary; pass `Some("")` to blank it.
56    ///
57    /// # Errors
58    ///
59    /// Returns an error if the database query fails.
60    pub async fn upsert_entity(
61        &self,
62        surface_name: &str,
63        canonical_name: &str,
64        entity_type: EntityType,
65        summary: Option<&str>,
66    ) -> Result<i64, MemoryError> {
67        let type_str = entity_type.as_str();
68        let id: i64 = zeph_db::query_scalar(sql!(
69            "INSERT INTO graph_entities (name, canonical_name, entity_type, summary)
70             VALUES (?, ?, ?, ?)
71             ON CONFLICT(canonical_name, entity_type) DO UPDATE SET
72               name = excluded.name,
73               summary = COALESCE(excluded.summary, summary),
74               last_seen_at = CURRENT_TIMESTAMP
75             RETURNING id"
76        ))
77        .bind(surface_name)
78        .bind(canonical_name)
79        .bind(type_str)
80        .bind(summary)
81        .fetch_one(&self.pool)
82        .await?;
83        Ok(id)
84    }
85
86    /// Find an entity by exact canonical name and type.
87    ///
88    /// # Errors
89    ///
90    /// Returns an error if the database query fails.
91    pub async fn find_entity(
92        &self,
93        canonical_name: &str,
94        entity_type: EntityType,
95    ) -> Result<Option<Entity>, MemoryError> {
96        let type_str = entity_type.as_str();
97        let row: Option<EntityRow> = zeph_db::query_as(
98            sql!("SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
99             FROM graph_entities
100             WHERE canonical_name = ? AND entity_type = ?"),
101        )
102        .bind(canonical_name)
103        .bind(type_str)
104        .fetch_optional(&self.pool)
105        .await?;
106        row.map(entity_from_row).transpose()
107    }
108
109    /// Find an entity by its numeric ID.
110    ///
111    /// # Errors
112    ///
113    /// Returns an error if the database query fails.
114    pub async fn find_entity_by_id(&self, entity_id: i64) -> Result<Option<Entity>, MemoryError> {
115        let row: Option<EntityRow> = zeph_db::query_as(
116            sql!("SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
117             FROM graph_entities
118             WHERE id = ?"),
119        )
120        .bind(entity_id)
121        .fetch_optional(&self.pool)
122        .await?;
123        row.map(entity_from_row).transpose()
124    }
125
126    /// Update the `qdrant_point_id` for an entity.
127    ///
128    /// # Errors
129    ///
130    /// Returns an error if the database query fails.
131    pub async fn set_entity_qdrant_point_id(
132        &self,
133        entity_id: i64,
134        point_id: &str,
135    ) -> Result<(), MemoryError> {
136        zeph_db::query(sql!(
137            "UPDATE graph_entities SET qdrant_point_id = ? WHERE id = ?"
138        ))
139        .bind(point_id)
140        .bind(entity_id)
141        .execute(&self.pool)
142        .await?;
143        Ok(())
144    }
145
146    /// Find entities matching `query` in name, summary, or aliases, up to `limit` results, ranked by relevance.
147    ///
148    /// Uses FTS5 MATCH with prefix wildcards (`token*`) and bm25 ranking. Name matches are
149    /// weighted 10x higher than summary matches. Also searches `graph_entity_aliases` for
150    /// alias matches via a UNION query.
151    ///
152    /// # Behavioral note
153    ///
154    /// This replaces the previous `LIKE '%query%'` implementation. FTS5 prefix matching differs
155    /// from substring matching: searching "SQL" will match "`SQLite`" (prefix) but NOT "`GraphQL`"
156    /// (substring). Entity names are indexed as single tokens by the unicode61 tokenizer, so
157    /// mid-word substrings are not matched. This is a known trade-off for index performance.
158    ///
159    /// Single-character queries (e.g., "a") are allowed and produce a broad prefix match ("a*").
160    /// The `limit` parameter caps the result set. No minimum query length is enforced; if this
161    /// causes noise in practice, add a minimum length guard at the call site.
162    ///
163    /// # Errors
164    ///
165    /// Returns an error if the database query fails.
166    pub async fn find_entities_fuzzy(
167        &self,
168        query: &str,
169        limit: usize,
170    ) -> Result<Vec<Entity>, MemoryError> {
171        // FTS5 boolean operator keywords (case-sensitive uppercase). Filtering these
172        // prevents syntax errors when user input contains them as literal search terms
173        // (e.g., "graph OR unrelated" must not produce "graph* OR* unrelated*").
174        const FTS5_OPERATORS: &[&str] = &["AND", "OR", "NOT", "NEAR"];
175        let query = &query[..query.floor_char_boundary(512)];
176        // Sanitize input: split on non-alphanumeric characters, filter empty tokens,
177        // append '*' to each token for FTS5 prefix matching ("graph" -> "graph*").
178        let sanitized = sanitize_fts_query(query);
179        if sanitized.is_empty() {
180            return Ok(vec![]);
181        }
182        let fts_query: String = sanitized
183            .split_whitespace()
184            .filter(|t| !FTS5_OPERATORS.contains(t))
185            .map(|t| format!("{t}*"))
186            .collect::<Vec<_>>()
187            .join(" ");
188        if fts_query.is_empty() {
189            return Ok(vec![]);
190        }
191
192        let limit = i64::try_from(limit)?;
193        // bm25(graph_entities_fts, 10.0, 1.0): name column weighted 10x over summary.
194        // bm25() returns negative values; ORDER BY ASC puts best matches first.
195        let search_sql = format!(
196            "SELECT DISTINCT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
197                    e.first_seen_at, e.last_seen_at, e.qdrant_point_id \
198             FROM graph_entities_fts fts \
199             JOIN graph_entities e ON e.id = fts.rowid \
200             WHERE graph_entities_fts MATCH ? \
201             UNION \
202             SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
203                    e.first_seen_at, e.last_seen_at, e.qdrant_point_id \
204             FROM graph_entity_aliases a \
205             JOIN graph_entities e ON e.id = a.entity_id \
206             WHERE a.alias_name LIKE ? ESCAPE '\\' {} \
207             LIMIT ?",
208            <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
209        );
210        let rows: Vec<EntityRow> = zeph_db::query_as(&search_sql)
211            .bind(&fts_query)
212            .bind(format!(
213                "%{}%",
214                query
215                    .trim()
216                    .replace('\\', "\\\\")
217                    .replace('%', "\\%")
218                    .replace('_', "\\_")
219            ))
220            .bind(limit)
221            .fetch_all(&self.pool)
222            .await?;
223        rows.into_iter()
224            .map(entity_from_row)
225            .collect::<Result<Vec<_>, _>>()
226    }
227
228    /// Flush the `SQLite` WAL to the main database file.
229    ///
230    /// Runs `PRAGMA wal_checkpoint(PASSIVE)`. Safe to call at any time; does not block active
231    /// readers or writers. Call after bulk entity inserts to ensure FTS5 shadow table writes are
232    /// visible to connections opened in future sessions.
233    ///
234    /// # Errors
235    ///
236    /// Returns an error if the PRAGMA execution fails.
237    #[cfg(feature = "sqlite")]
238    pub async fn checkpoint_wal(&self) -> Result<(), MemoryError> {
239        zeph_db::query("PRAGMA wal_checkpoint(PASSIVE)")
240            .execute(&self.pool)
241            .await?;
242        Ok(())
243    }
244
245    /// No-op on `PostgreSQL` (WAL management is handled by the server).
246    ///
247    /// # Errors
248    ///
249    /// Never returns an error.
250    #[cfg(feature = "postgres")]
251    pub async fn checkpoint_wal(&self) -> Result<(), MemoryError> {
252        Ok(())
253    }
254
255    /// Stream all entities from the database incrementally (true cursor, no full-table load).
256    pub fn all_entities_stream(&self) -> impl Stream<Item = Result<Entity, MemoryError>> + '_ {
257        use futures::StreamExt as _;
258        zeph_db::query_as::<_, EntityRow>(
259            sql!("SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
260             FROM graph_entities ORDER BY id ASC"),
261        )
262        .fetch(&self.pool)
263        .map(|r: Result<EntityRow, zeph_db::SqlxError>| {
264            r.map_err(MemoryError::from).and_then(entity_from_row)
265        })
266    }
267
268    // ── Alias methods ─────────────────────────────────────────────────────────
269
270    /// Insert an alias for an entity (idempotent: duplicate alias is silently ignored via UNIQUE constraint).
271    ///
272    /// # Errors
273    ///
274    /// Returns an error if the database query fails.
275    pub async fn add_alias(&self, entity_id: i64, alias_name: &str) -> Result<(), MemoryError> {
276        let insert_alias_sql = format!(
277            "{} INTO graph_entity_aliases (entity_id, alias_name) VALUES (?, ?){}",
278            <ActiveDialect as zeph_db::dialect::Dialect>::INSERT_IGNORE,
279            <ActiveDialect as zeph_db::dialect::Dialect>::CONFLICT_NOTHING,
280        );
281        zeph_db::query(&insert_alias_sql)
282            .bind(entity_id)
283            .bind(alias_name)
284            .execute(&self.pool)
285            .await?;
286        Ok(())
287    }
288
289    /// Find an entity by alias name and entity type (case-insensitive).
290    ///
291    /// Filters by `entity_type` to avoid cross-type alias collisions (S2 fix).
292    ///
293    /// # Errors
294    ///
295    /// Returns an error if the database query fails.
296    pub async fn find_entity_by_alias(
297        &self,
298        alias_name: &str,
299        entity_type: EntityType,
300    ) -> Result<Option<Entity>, MemoryError> {
301        let type_str = entity_type.as_str();
302        let alias_typed_sql = format!(
303            "SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
304                    e.first_seen_at, e.last_seen_at, e.qdrant_point_id \
305             FROM graph_entity_aliases a \
306             JOIN graph_entities e ON e.id = a.entity_id \
307             WHERE a.alias_name = ? {} \
308               AND e.entity_type = ? \
309             ORDER BY e.id ASC \
310             LIMIT 1",
311            <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
312        );
313        let row: Option<EntityRow> = zeph_db::query_as(&alias_typed_sql)
314            .bind(alias_name)
315            .bind(type_str)
316            .fetch_optional(&self.pool)
317            .await?;
318        row.map(entity_from_row).transpose()
319    }
320
321    /// Get all aliases for an entity.
322    ///
323    /// # Errors
324    ///
325    /// Returns an error if the database query fails.
326    pub async fn aliases_for_entity(
327        &self,
328        entity_id: i64,
329    ) -> Result<Vec<EntityAlias>, MemoryError> {
330        let rows: Vec<AliasRow> = zeph_db::query_as(sql!(
331            "SELECT id, entity_id, alias_name, created_at
332             FROM graph_entity_aliases
333             WHERE entity_id = ?
334             ORDER BY id ASC"
335        ))
336        .bind(entity_id)
337        .fetch_all(&self.pool)
338        .await?;
339        Ok(rows.into_iter().map(alias_from_row).collect())
340    }
341
342    /// Collect all entities into a Vec.
343    ///
344    /// # Errors
345    ///
346    /// Returns an error if the database query fails or `entity_type` parsing fails.
347    pub async fn all_entities(&self) -> Result<Vec<Entity>, MemoryError> {
348        use futures::TryStreamExt as _;
349        self.all_entities_stream().try_collect().await
350    }
351
352    /// Count the total number of entities.
353    ///
354    /// # Errors
355    ///
356    /// Returns an error if the database query fails.
357    pub async fn entity_count(&self) -> Result<i64, MemoryError> {
358        let count: i64 = zeph_db::query_scalar(sql!("SELECT COUNT(*) FROM graph_entities"))
359            .fetch_one(&self.pool)
360            .await?;
361        Ok(count)
362    }
363
364    // ── Edges ─────────────────────────────────────────────────────────────────
365
366    /// Insert a new edge between two entities, or update the existing active edge.
367    ///
368    /// An active edge is identified by `(source_entity_id, target_entity_id, relation, edge_type)`
369    /// with `valid_to IS NULL`. If such an edge already exists, its `confidence` is updated to the
370    /// maximum of the stored and incoming values, and the existing id is returned. This prevents
371    /// duplicate edges from repeated extraction of the same context messages.
372    ///
373    /// The dedup key includes `edge_type` (critic mitigation): the same `(source, target, relation)`
374    /// triple can legitimately exist with different edge types (e.g., `depends_on` can be both
375    /// Semantic and Causal). Without `edge_type` in the key, the second insertion would silently
376    /// update the first and lose the type classification.
377    ///
378    /// # Errors
379    ///
380    /// Returns an error if the database query fails.
381    pub async fn insert_edge(
382        &self,
383        source_entity_id: i64,
384        target_entity_id: i64,
385        relation: &str,
386        fact: &str,
387        confidence: f32,
388        episode_id: Option<MessageId>,
389    ) -> Result<i64, MemoryError> {
390        self.insert_edge_typed(
391            source_entity_id,
392            target_entity_id,
393            relation,
394            fact,
395            confidence,
396            episode_id,
397            EdgeType::Semantic,
398        )
399        .await
400    }
401
402    /// Insert a typed edge between two entities, or update the existing active edge of the same type.
403    ///
404    /// Identical semantics to [`Self::insert_edge`] but with an explicit `edge_type` parameter.
405    /// The dedup key is `(source_entity_id, target_entity_id, relation, edge_type, valid_to IS NULL)`.
406    ///
407    /// # Errors
408    ///
409    /// Returns an error if the database query fails.
410    #[allow(clippy::too_many_arguments)]
411    pub async fn insert_edge_typed(
412        &self,
413        source_entity_id: i64,
414        target_entity_id: i64,
415        relation: &str,
416        fact: &str,
417        confidence: f32,
418        episode_id: Option<MessageId>,
419        edge_type: EdgeType,
420    ) -> Result<i64, MemoryError> {
421        if source_entity_id == target_entity_id {
422            return Err(MemoryError::InvalidInput(format!(
423                "self-loop edge rejected: source and target are the same entity (id={source_entity_id})"
424            )));
425        }
426        let confidence = confidence.clamp(0.0, 1.0);
427        let edge_type_str = edge_type.as_str();
428
429        // Wrap SELECT + INSERT/UPDATE in a single transaction to eliminate the race window
430        // between existence check and write. The unique partial index uq_graph_edges_active
431        // covers (source, target, relation, edge_type) WHERE valid_to IS NULL; SQLite does not
432        // support ON CONFLICT DO UPDATE against partial indexes, so we keep two statements.
433        let mut tx = zeph_db::begin(&self.pool).await?;
434
435        let existing: Option<(i64, f64)> = zeph_db::query_as(sql!(
436            "SELECT id, confidence FROM graph_edges
437             WHERE source_entity_id = ?
438               AND target_entity_id = ?
439               AND relation = ?
440               AND edge_type = ?
441               AND valid_to IS NULL
442             LIMIT 1"
443        ))
444        .bind(source_entity_id)
445        .bind(target_entity_id)
446        .bind(relation)
447        .bind(edge_type_str)
448        .fetch_optional(&mut *tx)
449        .await?;
450
451        if let Some((existing_id, stored_conf)) = existing {
452            let updated_conf = f64::from(confidence).max(stored_conf);
453            zeph_db::query(sql!("UPDATE graph_edges SET confidence = ? WHERE id = ?"))
454                .bind(updated_conf)
455                .bind(existing_id)
456                .execute(&mut *tx)
457                .await?;
458            tx.commit().await?;
459            return Ok(existing_id);
460        }
461
462        let episode_raw: Option<i64> = episode_id.map(|m| m.0);
463        let id: i64 = zeph_db::query_scalar(sql!(
464            "INSERT INTO graph_edges
465             (source_entity_id, target_entity_id, relation, fact, confidence, episode_id, edge_type)
466             VALUES (?, ?, ?, ?, ?, ?, ?)
467             RETURNING id"
468        ))
469        .bind(source_entity_id)
470        .bind(target_entity_id)
471        .bind(relation)
472        .bind(fact)
473        .bind(f64::from(confidence))
474        .bind(episode_raw)
475        .bind(edge_type_str)
476        .fetch_one(&mut *tx)
477        .await?;
478        tx.commit().await?;
479        Ok(id)
480    }
481
482    /// Mark an edge as invalid (set `valid_to` and `expired_at` to now).
483    ///
484    /// # Errors
485    ///
486    /// Returns an error if the database update fails.
487    pub async fn invalidate_edge(&self, edge_id: i64) -> Result<(), MemoryError> {
488        zeph_db::query(sql!(
489            "UPDATE graph_edges SET valid_to = CURRENT_TIMESTAMP, expired_at = CURRENT_TIMESTAMP
490             WHERE id = ?"
491        ))
492        .bind(edge_id)
493        .execute(&self.pool)
494        .await?;
495        Ok(())
496    }
497
498    /// Invalidate an edge and record the supersession pointer for Kumiho belief revision audit trail.
499    ///
500    /// Sets `valid_to`, `expired_at`, and `superseded_by` on the old edge to link it to its replacement.
501    ///
502    /// # Errors
503    ///
504    /// Returns an error if the database update fails.
505    pub async fn invalidate_edge_with_supersession(
506        &self,
507        old_edge_id: i64,
508        new_edge_id: i64,
509    ) -> Result<(), MemoryError> {
510        zeph_db::query(sql!(
511            "UPDATE graph_edges
512             SET valid_to = CURRENT_TIMESTAMP,
513                 expired_at = CURRENT_TIMESTAMP,
514                 superseded_by = ?
515             WHERE id = ?"
516        ))
517        .bind(new_edge_id)
518        .bind(old_edge_id)
519        .execute(&self.pool)
520        .await?;
521        Ok(())
522    }
523
524    /// Get all active edges for a batch of entity IDs, with optional MAGMA edge type filtering.
525    ///
526    /// Fetches all currently-active edges (`valid_to IS NULL`) where either endpoint
527    /// is in `entity_ids`. Traversal is always current-time only (no `at_timestamp` support
528    /// in v1 — see `bfs_at_timestamp` for historical traversal).
529    ///
530    /// # `SQLite` bind limit safety
531    ///
532    /// `SQLite` limits the number of bind parameters to `SQLITE_MAX_VARIABLE_NUMBER` (999 by
533    /// default). Each entity ID requires two bind slots (source OR target), so batches are
534    /// chunked at `MAX_BATCH_ENTITIES = 490` to stay safely under the limit regardless of
535    /// compile-time `SQLite` configuration.
536    ///
537    /// # Errors
538    ///
539    /// Returns an error if the database query fails.
540    pub async fn edges_for_entities(
541        &self,
542        entity_ids: &[i64],
543        edge_types: &[super::types::EdgeType],
544    ) -> Result<Vec<Edge>, MemoryError> {
545        // Safe margin under SQLite SQLITE_MAX_VARIABLE_NUMBER (999):
546        // each entity ID uses 2 bind slots (source_entity_id OR target_entity_id).
547        // 490 * 2 = 980, leaving headroom for future query additions.
548        const MAX_BATCH_ENTITIES: usize = 490;
549
550        let mut all_edges: Vec<Edge> = Vec::new();
551
552        for chunk in entity_ids.chunks(MAX_BATCH_ENTITIES) {
553            let edges = self.query_batch_edges(chunk, edge_types).await?;
554            all_edges.extend(edges);
555        }
556
557        Ok(all_edges)
558    }
559
560    /// Query active edges for a single chunk of entity IDs (internal helper).
561    ///
562    /// Caller is responsible for ensuring `entity_ids.len() <= MAX_BATCH_ENTITIES`.
563    async fn query_batch_edges(
564        &self,
565        entity_ids: &[i64],
566        edge_types: &[super::types::EdgeType],
567    ) -> Result<Vec<Edge>, MemoryError> {
568        if entity_ids.is_empty() {
569            return Ok(Vec::new());
570        }
571
572        // Build a parameterized IN clause with backend-appropriate placeholders.
573        // We cannot use the sql! macro here because the placeholder count is dynamic.
574        let n_ids = entity_ids.len();
575        let n_types = edge_types.len();
576
577        let sql = if n_types == 0 {
578            // placeholders used twice (source IN and target IN)
579            let placeholders = placeholder_list(1, n_ids);
580            let placeholders2 = placeholder_list(n_ids + 1, n_ids);
581            format!(
582                "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
583                        valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
584                        edge_type, retrieval_count, last_retrieved_at, superseded_by, superseded_by
585                 FROM graph_edges
586                 WHERE valid_to IS NULL
587                   AND (source_entity_id IN ({placeholders}) OR target_entity_id IN ({placeholders2}))"
588            )
589        } else {
590            let placeholders = placeholder_list(1, n_ids);
591            let placeholders2 = placeholder_list(n_ids + 1, n_ids);
592            let type_placeholders = placeholder_list(n_ids * 2 + 1, n_types);
593            format!(
594                "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
595                        valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
596                        edge_type, retrieval_count, last_retrieved_at, superseded_by, superseded_by
597                 FROM graph_edges
598                 WHERE valid_to IS NULL
599                   AND (source_entity_id IN ({placeholders}) OR target_entity_id IN ({placeholders2}))
600                   AND edge_type IN ({type_placeholders})"
601            )
602        };
603
604        // Bind entity IDs twice (source IN and target IN clauses) then edge types.
605        let mut query = zeph_db::query_as::<_, EdgeRow>(&sql);
606        for id in entity_ids {
607            query = query.bind(*id);
608        }
609        for id in entity_ids {
610            query = query.bind(*id);
611        }
612        for et in edge_types {
613            query = query.bind(et.as_str());
614        }
615
616        // Wrap pool.acquire() + query execution in a short timeout to prevent the outer
617        // tokio::time::timeout (in SemanticMemory recall) from cancelling a mid-acquire
618        // future, which causes sqlx 0.8 semaphore count drift and permanent pool starvation.
619        let rows: Vec<EdgeRow> = tokio::time::timeout(
620            std::time::Duration::from_millis(500),
621            query.fetch_all(&self.pool),
622        )
623        .await
624        .map_err(|_| MemoryError::Timeout("graph pool acquire timed out after 500ms".into()))??;
625        Ok(rows.into_iter().map(edge_from_row).collect())
626    }
627
628    /// Get all active edges where entity is source or target.
629    ///
630    /// # Errors
631    ///
632    /// Returns an error if the database query fails.
633    pub async fn edges_for_entity(&self, entity_id: i64) -> Result<Vec<Edge>, MemoryError> {
634        let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
635            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
636                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
637                    edge_type, retrieval_count, last_retrieved_at, superseded_by
638             FROM graph_edges
639             WHERE valid_to IS NULL
640               AND (source_entity_id = ? OR target_entity_id = ?)"
641        ))
642        .bind(entity_id)
643        .bind(entity_id)
644        .fetch_all(&self.pool)
645        .await?;
646        Ok(rows.into_iter().map(edge_from_row).collect())
647    }
648
649    /// Get all edges (active and expired) where entity is source or target, ordered by
650    /// `valid_from DESC`. Used by the `/graph history <name>` slash command.
651    ///
652    /// # Errors
653    ///
654    /// Returns an error if the database query fails or if `limit` overflows `i64`.
655    pub async fn edge_history_for_entity(
656        &self,
657        entity_id: i64,
658        limit: usize,
659    ) -> Result<Vec<Edge>, MemoryError> {
660        let limit = i64::try_from(limit)?;
661        let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
662            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
663                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
664                    edge_type, retrieval_count, last_retrieved_at, superseded_by
665             FROM graph_edges
666             WHERE source_entity_id = ? OR target_entity_id = ?
667             ORDER BY valid_from DESC
668             LIMIT ?"
669        ))
670        .bind(entity_id)
671        .bind(entity_id)
672        .bind(limit)
673        .fetch_all(&self.pool)
674        .await?;
675        Ok(rows.into_iter().map(edge_from_row).collect())
676    }
677
678    /// Get all active edges between two entities (both directions).
679    ///
680    /// # Errors
681    ///
682    /// Returns an error if the database query fails.
683    pub async fn edges_between(
684        &self,
685        entity_a: i64,
686        entity_b: i64,
687    ) -> Result<Vec<Edge>, MemoryError> {
688        let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
689            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
690                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
691                    edge_type, retrieval_count, last_retrieved_at, superseded_by
692             FROM graph_edges
693             WHERE valid_to IS NULL
694               AND ((source_entity_id = ? AND target_entity_id = ?)
695                 OR (source_entity_id = ? AND target_entity_id = ?))"
696        ))
697        .bind(entity_a)
698        .bind(entity_b)
699        .bind(entity_b)
700        .bind(entity_a)
701        .fetch_all(&self.pool)
702        .await?;
703        Ok(rows.into_iter().map(edge_from_row).collect())
704    }
705
706    /// Get active edges from `source` to `target` in the exact direction (no reverse).
707    ///
708    /// # Errors
709    ///
710    /// Returns an error if the database query fails.
711    pub async fn edges_exact(
712        &self,
713        source_entity_id: i64,
714        target_entity_id: i64,
715    ) -> Result<Vec<Edge>, MemoryError> {
716        let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
717            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
718                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
719                    edge_type, retrieval_count, last_retrieved_at, superseded_by
720             FROM graph_edges
721             WHERE valid_to IS NULL
722               AND source_entity_id = ?
723               AND target_entity_id = ?"
724        ))
725        .bind(source_entity_id)
726        .bind(target_entity_id)
727        .fetch_all(&self.pool)
728        .await?;
729        Ok(rows.into_iter().map(edge_from_row).collect())
730    }
731
732    /// Count active (non-invalidated) edges.
733    ///
734    /// # Errors
735    ///
736    /// Returns an error if the database query fails.
737    pub async fn active_edge_count(&self) -> Result<i64, MemoryError> {
738        let count: i64 = zeph_db::query_scalar(sql!(
739            "SELECT COUNT(*) FROM graph_edges WHERE valid_to IS NULL"
740        ))
741        .fetch_one(&self.pool)
742        .await?;
743        Ok(count)
744    }
745
746    /// Return per-type active edge counts as `(edge_type, count)` pairs.
747    ///
748    /// # Errors
749    ///
750    /// Returns an error if the database query fails.
751    pub async fn edge_type_distribution(&self) -> Result<Vec<(String, i64)>, MemoryError> {
752        let rows: Vec<(String, i64)> = zeph_db::query_as(
753            sql!("SELECT edge_type, COUNT(*) FROM graph_edges WHERE valid_to IS NULL GROUP BY edge_type ORDER BY edge_type"),
754        )
755        .fetch_all(&self.pool)
756        .await?;
757        Ok(rows)
758    }
759
760    // ── Communities ───────────────────────────────────────────────────────────
761
762    /// Insert or update a community by name.
763    ///
764    /// `fingerprint` is a BLAKE3 hex string computed from sorted entity IDs and
765    /// intra-community edge IDs. Pass `None` to leave the fingerprint unchanged (e.g. when
766    /// `assign_to_community` adds an entity without a full re-detection pass).
767    ///
768    /// # Errors
769    ///
770    /// Returns an error if the database query fails or JSON serialization fails.
771    pub async fn upsert_community(
772        &self,
773        name: &str,
774        summary: &str,
775        entity_ids: &[i64],
776        fingerprint: Option<&str>,
777    ) -> Result<i64, MemoryError> {
778        let entity_ids_json = serde_json::to_string(entity_ids)?;
779        let id: i64 = zeph_db::query_scalar(sql!(
780            "INSERT INTO graph_communities (name, summary, entity_ids, fingerprint)
781             VALUES (?, ?, ?, ?)
782             ON CONFLICT(name) DO UPDATE SET
783               summary = excluded.summary,
784               entity_ids = excluded.entity_ids,
785               fingerprint = COALESCE(excluded.fingerprint, fingerprint),
786               updated_at = CURRENT_TIMESTAMP
787             RETURNING id"
788        ))
789        .bind(name)
790        .bind(summary)
791        .bind(entity_ids_json)
792        .bind(fingerprint)
793        .fetch_one(&self.pool)
794        .await?;
795        Ok(id)
796    }
797
798    /// Return a map of `fingerprint -> community_id` for all communities with a non-NULL
799    /// fingerprint. Used by `detect_communities` to skip unchanged partitions.
800    ///
801    /// # Errors
802    ///
803    /// Returns an error if the database query fails.
804    pub async fn community_fingerprints(&self) -> Result<HashMap<String, i64>, MemoryError> {
805        let rows: Vec<(String, i64)> = zeph_db::query_as(sql!(
806            "SELECT fingerprint, id FROM graph_communities WHERE fingerprint IS NOT NULL"
807        ))
808        .fetch_all(&self.pool)
809        .await?;
810        Ok(rows.into_iter().collect())
811    }
812
813    /// Delete a single community by its primary key.
814    ///
815    /// # Errors
816    ///
817    /// Returns an error if the database query fails.
818    pub async fn delete_community_by_id(&self, id: i64) -> Result<(), MemoryError> {
819        zeph_db::query(sql!("DELETE FROM graph_communities WHERE id = ?"))
820            .bind(id)
821            .execute(&self.pool)
822            .await?;
823        Ok(())
824    }
825
826    /// Set the fingerprint of a community to `NULL`, invalidating the incremental cache.
827    ///
828    /// Used by `assign_to_community` when an entity is added without a full re-detection pass,
829    /// ensuring the next `detect_communities` run re-summarizes the affected community.
830    ///
831    /// # Errors
832    ///
833    /// Returns an error if the database query fails.
834    pub async fn clear_community_fingerprint(&self, id: i64) -> Result<(), MemoryError> {
835        zeph_db::query(sql!(
836            "UPDATE graph_communities SET fingerprint = NULL WHERE id = ?"
837        ))
838        .bind(id)
839        .execute(&self.pool)
840        .await?;
841        Ok(())
842    }
843
844    /// Find the first community that contains the given `entity_id`.
845    ///
846    /// Uses `json_each()` to push the membership search into `SQLite`, avoiding a full
847    /// table scan with per-row JSON parsing.
848    ///
849    /// # Errors
850    ///
851    /// Returns an error if the database query fails or JSON parsing fails.
852    pub async fn community_for_entity(
853        &self,
854        entity_id: i64,
855    ) -> Result<Option<Community>, MemoryError> {
856        let row: Option<CommunityRow> = zeph_db::query_as(
857            sql!("SELECT c.id, c.name, c.summary, c.entity_ids, c.fingerprint, c.created_at, c.updated_at
858             FROM graph_communities c, json_each(c.entity_ids) j
859             WHERE CAST(j.value AS INTEGER) = ?
860             LIMIT 1"),
861        )
862        .bind(entity_id)
863        .fetch_optional(&self.pool)
864        .await?;
865        match row {
866            Some(row) => {
867                let entity_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
868                Ok(Some(Community {
869                    id: row.id,
870                    name: row.name,
871                    summary: row.summary,
872                    entity_ids,
873                    fingerprint: row.fingerprint,
874                    created_at: row.created_at,
875                    updated_at: row.updated_at,
876                }))
877            }
878            None => Ok(None),
879        }
880    }
881
882    /// Get all communities.
883    ///
884    /// # Errors
885    ///
886    /// Returns an error if the database query fails or JSON parsing fails.
887    pub async fn all_communities(&self) -> Result<Vec<Community>, MemoryError> {
888        let rows: Vec<CommunityRow> = zeph_db::query_as(sql!(
889            "SELECT id, name, summary, entity_ids, fingerprint, created_at, updated_at
890             FROM graph_communities
891             ORDER BY id ASC"
892        ))
893        .fetch_all(&self.pool)
894        .await?;
895
896        rows.into_iter()
897            .map(|row| {
898                let entity_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
899                Ok(Community {
900                    id: row.id,
901                    name: row.name,
902                    summary: row.summary,
903                    entity_ids,
904                    fingerprint: row.fingerprint,
905                    created_at: row.created_at,
906                    updated_at: row.updated_at,
907                })
908            })
909            .collect()
910    }
911
912    /// Count the total number of communities.
913    ///
914    /// # Errors
915    ///
916    /// Returns an error if the database query fails.
917    pub async fn community_count(&self) -> Result<i64, MemoryError> {
918        let count: i64 = zeph_db::query_scalar(sql!("SELECT COUNT(*) FROM graph_communities"))
919            .fetch_one(&self.pool)
920            .await?;
921        Ok(count)
922    }
923
924    // ── Metadata ──────────────────────────────────────────────────────────────
925
926    /// Get a metadata value by key.
927    ///
928    /// # Errors
929    ///
930    /// Returns an error if the database query fails.
931    pub async fn get_metadata(&self, key: &str) -> Result<Option<String>, MemoryError> {
932        let val: Option<String> =
933            zeph_db::query_scalar(sql!("SELECT value FROM graph_metadata WHERE key = ?"))
934                .bind(key)
935                .fetch_optional(&self.pool)
936                .await?;
937        Ok(val)
938    }
939
940    /// Set a metadata value by key (upsert).
941    ///
942    /// # Errors
943    ///
944    /// Returns an error if the database query fails.
945    pub async fn set_metadata(&self, key: &str, value: &str) -> Result<(), MemoryError> {
946        zeph_db::query(sql!(
947            "INSERT INTO graph_metadata (key, value) VALUES (?, ?)
948             ON CONFLICT(key) DO UPDATE SET value = excluded.value"
949        ))
950        .bind(key)
951        .bind(value)
952        .execute(&self.pool)
953        .await?;
954        Ok(())
955    }
956
957    /// Get the current extraction count from metadata.
958    ///
959    /// Returns 0 if the counter has not been initialized.
960    ///
961    /// # Errors
962    ///
963    /// Returns an error if the database query fails.
964    pub async fn extraction_count(&self) -> Result<i64, MemoryError> {
965        let val = self.get_metadata("extraction_count").await?;
966        Ok(val.and_then(|v| v.parse::<i64>().ok()).unwrap_or(0))
967    }
968
969    /// Stream all active (non-invalidated) edges.
970    pub fn all_active_edges_stream(&self) -> impl Stream<Item = Result<Edge, MemoryError>> + '_ {
971        use futures::StreamExt as _;
972        zeph_db::query_as::<_, EdgeRow>(sql!(
973            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
974                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
975                    edge_type, retrieval_count, last_retrieved_at, superseded_by
976             FROM graph_edges
977             WHERE valid_to IS NULL
978             ORDER BY id ASC"
979        ))
980        .fetch(&self.pool)
981        .map(|r| r.map_err(MemoryError::from).map(edge_from_row))
982    }
983
984    /// Fetch a chunk of active edges using keyset pagination.
985    ///
986    /// Returns edges with `id > after_id` in ascending order, up to `limit` rows.
987    /// Starting with `after_id = 0` returns the first chunk. Pass the last `id` from
988    /// the returned chunk as `after_id` for the next page. An empty result means all
989    /// edges have been consumed.
990    ///
991    /// Keyset pagination is O(1) per page (index seek on `id`) vs OFFSET which is O(N).
992    /// It is also stable under concurrent inserts: new edges get monotonically higher IDs
993    /// and will appear in subsequent chunks or after the last chunk, never causing
994    /// duplicates. Concurrent invalidations (setting `valid_to`) may cause a single edge
995    /// to be skipped, which is acceptable — LPA operates on an eventual-consistency snapshot.
996    ///
997    /// # Errors
998    ///
999    /// Returns an error if the database query fails.
1000    pub async fn edges_after_id(
1001        &self,
1002        after_id: i64,
1003        limit: i64,
1004    ) -> Result<Vec<Edge>, MemoryError> {
1005        let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
1006            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1007                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1008                    edge_type, retrieval_count, last_retrieved_at, superseded_by
1009             FROM graph_edges
1010             WHERE valid_to IS NULL AND id > ?
1011             ORDER BY id ASC
1012             LIMIT ?"
1013        ))
1014        .bind(after_id)
1015        .bind(limit)
1016        .fetch_all(&self.pool)
1017        .await?;
1018        Ok(rows.into_iter().map(edge_from_row).collect())
1019    }
1020
1021    /// Find a community by its primary key.
1022    ///
1023    /// # Errors
1024    ///
1025    /// Returns an error if the database query fails or JSON parsing fails.
1026    pub async fn find_community_by_id(&self, id: i64) -> Result<Option<Community>, MemoryError> {
1027        let row: Option<CommunityRow> = zeph_db::query_as(sql!(
1028            "SELECT id, name, summary, entity_ids, fingerprint, created_at, updated_at
1029             FROM graph_communities
1030             WHERE id = ?"
1031        ))
1032        .bind(id)
1033        .fetch_optional(&self.pool)
1034        .await?;
1035        match row {
1036            Some(row) => {
1037                let entity_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
1038                Ok(Some(Community {
1039                    id: row.id,
1040                    name: row.name,
1041                    summary: row.summary,
1042                    entity_ids,
1043                    fingerprint: row.fingerprint,
1044                    created_at: row.created_at,
1045                    updated_at: row.updated_at,
1046                }))
1047            }
1048            None => Ok(None),
1049        }
1050    }
1051
1052    /// Delete all communities (full rebuild before upsert).
1053    ///
1054    /// # Errors
1055    ///
1056    /// Returns an error if the database query fails.
1057    pub async fn delete_all_communities(&self) -> Result<(), MemoryError> {
1058        zeph_db::query(sql!("DELETE FROM graph_communities"))
1059            .execute(&self.pool)
1060            .await?;
1061        Ok(())
1062    }
1063
1064    // ── A-MEM Retrieval Tracking ──────────────────────────────────────────────
1065
1066    /// Find entities matching `query` and return them with normalized FTS5 scores.
1067    ///
1068    /// Returns `Vec<(Entity, fts_score)>` where `fts_score` is normalized to `[0.0, 1.0]`
1069    /// by dividing each negated BM25 value by the maximum in the result set.
1070    /// Alias matches receive a fixed score of `0.5` (relative to FTS matches before normalization).
1071    ///
1072    /// Uses `UNION ALL` with outer `ORDER BY` to preserve FTS5 ordering through the LIMIT.
1073    ///
1074    /// # Errors
1075    ///
1076    /// Returns an error if the database query fails.
1077    #[allow(clippy::too_many_lines)]
1078    pub async fn find_entities_ranked(
1079        &self,
1080        query: &str,
1081        limit: usize,
1082    ) -> Result<Vec<(Entity, f32)>, MemoryError> {
1083        // Row type for UNION ALL FTS5 query: (id, name, canonical_name, entity_type,
1084        // summary, first_seen_at, last_seen_at, qdrant_point_id, fts_rank).
1085        type EntityFtsRow = (
1086            i64,
1087            String,
1088            String,
1089            String,
1090            Option<String>,
1091            String,
1092            String,
1093            Option<String>,
1094            f64,
1095        );
1096
1097        const FTS5_OPERATORS: &[&str] = &["AND", "OR", "NOT", "NEAR"];
1098        let query = &query[..query.floor_char_boundary(512)];
1099        let sanitized = sanitize_fts_query(query);
1100        if sanitized.is_empty() {
1101            return Ok(vec![]);
1102        }
1103        let fts_query: String = sanitized
1104            .split_whitespace()
1105            .filter(|t| !FTS5_OPERATORS.contains(t))
1106            .map(|t| format!("{t}*"))
1107            .collect::<Vec<_>>()
1108            .join(" ");
1109        if fts_query.is_empty() {
1110            return Ok(vec![]);
1111        }
1112
1113        let limit_i64 = i64::try_from(limit)?;
1114
1115        // UNION ALL with outer ORDER BY preserves FTS5 BM25 ordering through LIMIT.
1116        // Alias matches get a fixed raw score of 0.5 (below any real BM25 match).
1117        let ranked_fts_sql = format!(
1118            "SELECT * FROM ( \
1119                 SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
1120                        e.first_seen_at, e.last_seen_at, e.qdrant_point_id, \
1121                        -bm25(graph_entities_fts, 10.0, 1.0) AS fts_rank \
1122                 FROM graph_entities_fts fts \
1123                 JOIN graph_entities e ON e.id = fts.rowid \
1124                 WHERE graph_entities_fts MATCH ? \
1125                 UNION ALL \
1126                 SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
1127                        e.first_seen_at, e.last_seen_at, e.qdrant_point_id, \
1128                        0.5 AS fts_rank \
1129                 FROM graph_entity_aliases a \
1130                 JOIN graph_entities e ON e.id = a.entity_id \
1131                 WHERE a.alias_name LIKE ? ESCAPE '\\' {} \
1132             ) \
1133             ORDER BY fts_rank DESC \
1134             LIMIT ?",
1135            <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
1136        );
1137        let rows: Vec<EntityFtsRow> = zeph_db::query_as(&ranked_fts_sql)
1138            .bind(&fts_query)
1139            .bind(format!(
1140                "%{}%",
1141                query
1142                    .trim()
1143                    .replace('\\', "\\\\")
1144                    .replace('%', "\\%")
1145                    .replace('_', "\\_")
1146            ))
1147            .bind(limit_i64)
1148            .fetch_all(&self.pool)
1149            .await?;
1150
1151        if rows.is_empty() {
1152            return Ok(vec![]);
1153        }
1154
1155        // Normalize FTS scores to [0, 1] by dividing by max; guard against div-by-zero.
1156        let max_score: f64 = rows.iter().map(|r| r.8).fold(0.0_f64, f64::max);
1157        let max_score = if max_score <= 0.0 { 1.0 } else { max_score };
1158
1159        // Deduplicate by entity ID (keep first/highest-ranked occurrence).
1160        let mut seen_ids: std::collections::HashSet<i64> = std::collections::HashSet::new();
1161        let mut result: Vec<(Entity, f32)> = Vec::with_capacity(rows.len());
1162        for (
1163            id,
1164            name,
1165            canonical_name,
1166            entity_type_str,
1167            summary,
1168            first_seen_at,
1169            last_seen_at,
1170            qdrant_point_id,
1171            raw_score,
1172        ) in rows
1173        {
1174            if !seen_ids.insert(id) {
1175                continue;
1176            }
1177            let entity_type = entity_type_str
1178                .parse()
1179                .unwrap_or(super::types::EntityType::Concept);
1180            let entity = Entity {
1181                id,
1182                name,
1183                canonical_name,
1184                entity_type,
1185                summary,
1186                first_seen_at,
1187                last_seen_at,
1188                qdrant_point_id,
1189            };
1190            #[allow(clippy::cast_possible_truncation)]
1191            let normalized = (raw_score / max_score).clamp(0.0, 1.0) as f32;
1192            result.push((entity, normalized));
1193        }
1194
1195        Ok(result)
1196    }
1197
1198    /// Compute structural scores (degree + edge type diversity) for a batch of entity IDs.
1199    ///
1200    /// Returns `HashMap<entity_id, structural_score>` where score is in `[0.0, 1.0]`.
1201    /// Formula: `0.6 * (degree / max_degree) + 0.4 * (type_diversity / 4.0)`.
1202    /// Entities with no edges receive score `0.0`.
1203    ///
1204    /// # Errors
1205    ///
1206    /// Returns an error if the database query fails.
1207    pub async fn entity_structural_scores(
1208        &self,
1209        entity_ids: &[i64],
1210    ) -> Result<HashMap<i64, f32>, MemoryError> {
1211        // Each query binds entity_ids three times (three IN clauses).
1212        // Stay safely under SQLite 999-variable limit: 999 / 3 = 333, use 163 for headroom.
1213        const MAX_BATCH: usize = 163;
1214
1215        if entity_ids.is_empty() {
1216            return Ok(HashMap::new());
1217        }
1218
1219        let mut all_rows: Vec<(i64, i64, i64)> = Vec::new();
1220        for chunk in entity_ids.chunks(MAX_BATCH) {
1221            let n = chunk.len();
1222            // Three copies of chunk IDs: positions 1..n, n+1..2n, 2n+1..3n
1223            let ph1 = placeholder_list(1, n);
1224            let ph2 = placeholder_list(n + 1, n);
1225            let ph3 = placeholder_list(n * 2 + 1, n);
1226
1227            // Build query: count degree and distinct edge types for each entity.
1228            let sql = format!(
1229                "SELECT entity_id,
1230                        COUNT(*) AS degree,
1231                        COUNT(DISTINCT edge_type) AS type_diversity
1232                 FROM (
1233                     SELECT source_entity_id AS entity_id, edge_type
1234                     FROM graph_edges
1235                     WHERE valid_to IS NULL AND source_entity_id IN ({ph1})
1236                     UNION ALL
1237                     SELECT target_entity_id AS entity_id, edge_type
1238                     FROM graph_edges
1239                     WHERE valid_to IS NULL AND target_entity_id IN ({ph2})
1240                 )
1241                 WHERE entity_id IN ({ph3})
1242                 GROUP BY entity_id"
1243            );
1244
1245            let mut query = zeph_db::query_as::<_, (i64, i64, i64)>(&sql);
1246            // Bind chunk three times (three IN clauses)
1247            for id in chunk {
1248                query = query.bind(*id);
1249            }
1250            for id in chunk {
1251                query = query.bind(*id);
1252            }
1253            for id in chunk {
1254                query = query.bind(*id);
1255            }
1256
1257            let chunk_rows: Vec<(i64, i64, i64)> = query.fetch_all(&self.pool).await?;
1258            all_rows.extend(chunk_rows);
1259        }
1260
1261        if all_rows.is_empty() {
1262            return Ok(entity_ids.iter().map(|&id| (id, 0.0_f32)).collect());
1263        }
1264
1265        let max_degree = all_rows
1266            .iter()
1267            .map(|(_, d, _)| *d)
1268            .max()
1269            .unwrap_or(1)
1270            .max(1);
1271
1272        let mut scores: HashMap<i64, f32> = entity_ids.iter().map(|&id| (id, 0.0_f32)).collect();
1273        for (entity_id, degree, type_diversity) in all_rows {
1274            #[allow(clippy::cast_precision_loss)]
1275            let norm_degree = degree as f32 / max_degree as f32;
1276            #[allow(clippy::cast_precision_loss)]
1277            let norm_diversity = (type_diversity as f32 / 4.0).min(1.0);
1278            let score = 0.6 * norm_degree + 0.4 * norm_diversity;
1279            scores.insert(entity_id, score);
1280        }
1281
1282        Ok(scores)
1283    }
1284
1285    /// Look up community IDs for a batch of entity IDs.
1286    ///
1287    /// Returns `HashMap<entity_id, community_id>`. Entities not assigned to any community
1288    /// are absent from the map (treated as `None` by callers — no community cap applied).
1289    ///
1290    /// # Errors
1291    ///
1292    /// Returns an error if the database query fails.
1293    pub async fn entity_community_ids(
1294        &self,
1295        entity_ids: &[i64],
1296    ) -> Result<HashMap<i64, i64>, MemoryError> {
1297        const MAX_BATCH: usize = 490;
1298
1299        if entity_ids.is_empty() {
1300            return Ok(HashMap::new());
1301        }
1302
1303        let mut result: HashMap<i64, i64> = HashMap::new();
1304        for chunk in entity_ids.chunks(MAX_BATCH) {
1305            let placeholders = placeholder_list(1, chunk.len());
1306
1307            let community_sql = community_ids_sql(&placeholders);
1308            let mut query = zeph_db::query_as::<_, (i64, i64)>(&community_sql);
1309            for id in chunk {
1310                query = query.bind(*id);
1311            }
1312
1313            let rows: Vec<(i64, i64)> = query.fetch_all(&self.pool).await?;
1314            result.extend(rows);
1315        }
1316
1317        Ok(result)
1318    }
1319
1320    /// Increment `retrieval_count` and set `last_retrieved_at` for a batch of edge IDs.
1321    ///
1322    /// Fire-and-forget: errors are logged but not propagated. Caller should log the warning.
1323    /// Batched with `MAX_BATCH = 490` to stay safely under `SQLite` bind variable limit.
1324    ///
1325    /// # Errors
1326    ///
1327    /// Returns an error if the database query fails.
1328    pub async fn record_edge_retrieval(&self, edge_ids: &[i64]) -> Result<(), MemoryError> {
1329        const MAX_BATCH: usize = 490;
1330        let epoch_now = <ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1331        for chunk in edge_ids.chunks(MAX_BATCH) {
1332            let edge_placeholders = placeholder_list(1, chunk.len());
1333            let retrieval_sql = format!(
1334                "UPDATE graph_edges \
1335                 SET retrieval_count = retrieval_count + 1, \
1336                     last_retrieved_at = {epoch_now} \
1337                 WHERE id IN ({edge_placeholders})"
1338            );
1339            let mut q = zeph_db::query(&retrieval_sql);
1340            for id in chunk {
1341                q = q.bind(*id);
1342            }
1343            q.execute(&self.pool).await?;
1344        }
1345        Ok(())
1346    }
1347
1348    /// Apply multiplicative decay to `retrieval_count` for un-retrieved active edges.
1349    ///
1350    /// Only edges with `retrieval_count > 0` and `last_retrieved_at < (now - interval_secs)`
1351    /// are updated. Returns the number of rows affected.
1352    ///
1353    /// # Errors
1354    ///
1355    /// Returns an error if the database query fails.
1356    pub async fn decay_edge_retrieval_counts(
1357        &self,
1358        decay_lambda: f64,
1359        interval_secs: u64,
1360    ) -> Result<usize, MemoryError> {
1361        let epoch_now_decay = <ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1362        let decay_raw = format!(
1363            "UPDATE graph_edges \
1364             SET retrieval_count = MAX(CAST(retrieval_count * ? AS INTEGER), 0) \
1365             WHERE valid_to IS NULL \
1366               AND retrieval_count > 0 \
1367               AND (last_retrieved_at IS NULL OR last_retrieved_at < {epoch_now_decay} - ?)"
1368        );
1369        let decay_sql = zeph_db::rewrite_placeholders(&decay_raw);
1370        let result = zeph_db::query(&decay_sql)
1371            .bind(decay_lambda)
1372            .bind(i64::try_from(interval_secs).unwrap_or(i64::MAX))
1373            .execute(&self.pool)
1374            .await?;
1375        Ok(usize::try_from(result.rows_affected())?)
1376    }
1377
1378    /// Delete expired edges older than `retention_days` and return count deleted.
1379    ///
1380    /// # Errors
1381    ///
1382    /// Returns an error if the database query fails.
1383    pub async fn delete_expired_edges(&self, retention_days: u32) -> Result<usize, MemoryError> {
1384        let days = i64::from(retention_days);
1385        let result = zeph_db::query(sql!(
1386            "DELETE FROM graph_edges
1387             WHERE expired_at IS NOT NULL
1388               AND expired_at < datetime('now', '-' || ? || ' days')"
1389        ))
1390        .bind(days)
1391        .execute(&self.pool)
1392        .await?;
1393        Ok(usize::try_from(result.rows_affected())?)
1394    }
1395
1396    /// Delete orphan entities (no active edges, last seen more than `retention_days` ago).
1397    ///
1398    /// # Errors
1399    ///
1400    /// Returns an error if the database query fails.
1401    pub async fn delete_orphan_entities(&self, retention_days: u32) -> Result<usize, MemoryError> {
1402        let days = i64::from(retention_days);
1403        let result = zeph_db::query(sql!(
1404            "DELETE FROM graph_entities
1405             WHERE id NOT IN (
1406                 SELECT DISTINCT source_entity_id FROM graph_edges WHERE valid_to IS NULL
1407                 UNION
1408                 SELECT DISTINCT target_entity_id FROM graph_edges WHERE valid_to IS NULL
1409             )
1410             AND last_seen_at < datetime('now', '-' || ? || ' days')"
1411        ))
1412        .bind(days)
1413        .execute(&self.pool)
1414        .await?;
1415        Ok(usize::try_from(result.rows_affected())?)
1416    }
1417
1418    /// Delete the oldest excess entities when count exceeds `max_entities`.
1419    ///
1420    /// Entities are ranked by ascending edge count, then ascending `last_seen_at` (LRU).
1421    /// Only deletes when `entity_count() > max_entities`.
1422    ///
1423    /// # Errors
1424    ///
1425    /// Returns an error if the database query fails.
1426    pub async fn cap_entities(&self, max_entities: usize) -> Result<usize, MemoryError> {
1427        let current = self.entity_count().await?;
1428        let max = i64::try_from(max_entities)?;
1429        if current <= max {
1430            return Ok(0);
1431        }
1432        let excess = current - max;
1433        let result = zeph_db::query(sql!(
1434            "DELETE FROM graph_entities
1435             WHERE id IN (
1436                 SELECT e.id
1437                 FROM graph_entities e
1438                 LEFT JOIN (
1439                     SELECT source_entity_id AS eid, COUNT(*) AS cnt
1440                     FROM graph_edges WHERE valid_to IS NULL GROUP BY source_entity_id
1441                     UNION ALL
1442                     SELECT target_entity_id AS eid, COUNT(*) AS cnt
1443                     FROM graph_edges WHERE valid_to IS NULL GROUP BY target_entity_id
1444                 ) edge_counts ON e.id = edge_counts.eid
1445                 ORDER BY COALESCE(edge_counts.cnt, 0) ASC, e.last_seen_at ASC
1446                 LIMIT ?
1447             )"
1448        ))
1449        .bind(excess)
1450        .execute(&self.pool)
1451        .await?;
1452        Ok(usize::try_from(result.rows_affected())?)
1453    }
1454
1455    // ── Temporal Edge Queries ─────────────────────────────────────────────────
1456
1457    /// Return all edges for `entity_id` (as source or target) that were valid at `timestamp`.
1458    ///
1459    /// An edge is valid at `timestamp` when:
1460    /// - `valid_from <= timestamp`, AND
1461    /// - `valid_to IS NULL` (open-ended) OR `valid_to > timestamp`.
1462    ///
1463    /// `timestamp` must be a `SQLite` datetime string: `"YYYY-MM-DD HH:MM:SS"`.
1464    ///
1465    /// # Errors
1466    ///
1467    /// Returns an error if the database query fails.
1468    pub async fn edges_at_timestamp(
1469        &self,
1470        entity_id: i64,
1471        timestamp: &str,
1472    ) -> Result<Vec<Edge>, MemoryError> {
1473        // Split into two UNIONed branches to leverage the partial indexes from migration 030:
1474        //   Branch 1 (active edges):     idx_graph_edges_valid + idx_graph_edges_source/target
1475        //   Branch 2 (historical edges): idx_graph_edges_src_temporal / idx_graph_edges_tgt_temporal
1476        let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
1477            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1478                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1479                    edge_type, retrieval_count, last_retrieved_at, superseded_by
1480             FROM graph_edges
1481             WHERE valid_to IS NULL
1482               AND valid_from <= ?
1483               AND (source_entity_id = ? OR target_entity_id = ?)
1484             UNION ALL
1485             SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1486                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1487                    edge_type, retrieval_count, last_retrieved_at, superseded_by
1488             FROM graph_edges
1489             WHERE valid_to IS NOT NULL
1490               AND valid_from <= ?
1491               AND valid_to > ?
1492               AND (source_entity_id = ? OR target_entity_id = ?)"
1493        ))
1494        .bind(timestamp)
1495        .bind(entity_id)
1496        .bind(entity_id)
1497        .bind(timestamp)
1498        .bind(timestamp)
1499        .bind(entity_id)
1500        .bind(entity_id)
1501        .fetch_all(&self.pool)
1502        .await?;
1503        Ok(rows.into_iter().map(edge_from_row).collect())
1504    }
1505
1506    /// Return all edge versions (active and expired) for the given `(source, predicate)` pair.
1507    ///
1508    /// The optional `relation` filter restricts results to a specific relation label.
1509    /// Results are ordered by `valid_from DESC` (most recent first).
1510    ///
1511    /// # Errors
1512    ///
1513    /// Returns an error if the database query fails.
1514    pub async fn edge_history(
1515        &self,
1516        source_entity_id: i64,
1517        predicate: &str,
1518        relation: Option<&str>,
1519        limit: usize,
1520    ) -> Result<Vec<Edge>, MemoryError> {
1521        // Escape LIKE wildcards so `%` and `_` in the predicate are treated as literals.
1522        let escaped = predicate
1523            .replace('\\', "\\\\")
1524            .replace('%', "\\%")
1525            .replace('_', "\\_");
1526        let like_pattern = format!("%{escaped}%");
1527        let limit = i64::try_from(limit)?;
1528        let rows: Vec<EdgeRow> = if let Some(rel) = relation {
1529            zeph_db::query_as(sql!(
1530                "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1531                        valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1532                        edge_type, retrieval_count, last_retrieved_at, superseded_by, superseded_by
1533                 FROM graph_edges
1534                 WHERE source_entity_id = ?
1535                   AND fact LIKE ? ESCAPE '\\'
1536                   AND relation = ?
1537                 ORDER BY valid_from DESC
1538                 LIMIT ?"
1539            ))
1540            .bind(source_entity_id)
1541            .bind(&like_pattern)
1542            .bind(rel)
1543            .bind(limit)
1544            .fetch_all(&self.pool)
1545            .await?
1546        } else {
1547            zeph_db::query_as(sql!(
1548                "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1549                        valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1550                        edge_type, retrieval_count, last_retrieved_at, superseded_by, superseded_by
1551                 FROM graph_edges
1552                 WHERE source_entity_id = ?
1553                   AND fact LIKE ? ESCAPE '\\'
1554                 ORDER BY valid_from DESC
1555                 LIMIT ?"
1556            ))
1557            .bind(source_entity_id)
1558            .bind(&like_pattern)
1559            .bind(limit)
1560            .fetch_all(&self.pool)
1561            .await?
1562        };
1563        Ok(rows.into_iter().map(edge_from_row).collect())
1564    }
1565
1566    // ── BFS Traversal ─────────────────────────────────────────────────────────
1567
1568    /// Breadth-first traversal from `start_entity_id` up to `max_hops` hops.
1569    ///
1570    /// Returns all reachable entities and the active edges connecting them.
1571    /// Implements BFS iteratively in Rust to guarantee cycle safety regardless
1572    /// of `SQLite` CTE limitations.
1573    ///
1574    /// **`SQLite` bind parameter limit**: each BFS hop binds the frontier IDs three times in the
1575    /// neighbour query. At ~300+ frontier entities per hop, the IN clause may approach `SQLite`'s
1576    /// default `SQLITE_MAX_VARIABLE_NUMBER` limit of 999. Acceptable for Phase 1 (small graphs,
1577    /// `max_hops` typically 2–3). For large graphs, consider batching or a temp-table approach.
1578    ///
1579    /// # Errors
1580    ///
1581    /// Returns an error if any database query fails.
1582    pub async fn bfs(
1583        &self,
1584        start_entity_id: i64,
1585        max_hops: u32,
1586    ) -> Result<(Vec<Entity>, Vec<Edge>), MemoryError> {
1587        self.bfs_with_depth(start_entity_id, max_hops)
1588            .await
1589            .map(|(e, ed, _)| (e, ed))
1590    }
1591
1592    /// BFS traversal returning entities, edges, and a depth map (`entity_id` → hop distance).
1593    ///
1594    /// The depth map records the minimum hop distance from `start_entity_id` to each visited
1595    /// entity. The start entity itself has depth 0.
1596    ///
1597    /// **`SQLite` bind parameter limit**: see [`Self::bfs`] for notes on frontier size limits.
1598    ///
1599    /// # Errors
1600    ///
1601    /// Returns an error if any database query fails.
1602    pub async fn bfs_with_depth(
1603        &self,
1604        start_entity_id: i64,
1605        max_hops: u32,
1606    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1607        self.bfs_core(start_entity_id, max_hops, None).await
1608    }
1609
1610    /// BFS traversal considering only edges that were valid at `timestamp`.
1611    ///
1612    /// Equivalent to [`Self::bfs_with_depth`] but replaces the `valid_to IS NULL` filter with
1613    /// the temporal range predicate `valid_from <= ts AND (valid_to IS NULL OR valid_to > ts)`.
1614    ///
1615    /// `timestamp` must be a `SQLite` datetime string: `"YYYY-MM-DD HH:MM:SS"`.
1616    ///
1617    /// # Errors
1618    ///
1619    /// Returns an error if any database query fails.
1620    pub async fn bfs_at_timestamp(
1621        &self,
1622        start_entity_id: i64,
1623        max_hops: u32,
1624        timestamp: &str,
1625    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1626        self.bfs_core(start_entity_id, max_hops, Some(timestamp))
1627            .await
1628    }
1629
1630    /// BFS traversal scoped to specific MAGMA edge types.
1631    ///
1632    /// When `edge_types` is empty, behaves identically to [`Self::bfs_with_depth`] (traverses all
1633    /// active edges). When `edge_types` is non-empty, only traverses edges whose `edge_type`
1634    /// matches one of the provided types.
1635    ///
1636    /// This enables subgraph-scoped retrieval: a causal query traverses only causal + semantic
1637    /// edges, a temporal query only temporal + semantic edges, etc.
1638    ///
1639    /// Note: Semantic is typically included in `edge_types` by the caller to ensure recall is
1640    /// never worse than the untyped BFS. See `classify_graph_subgraph` in `router.rs`.
1641    ///
1642    /// # Errors
1643    ///
1644    /// Returns an error if any database query fails.
1645    pub async fn bfs_typed(
1646        &self,
1647        start_entity_id: i64,
1648        max_hops: u32,
1649        edge_types: &[EdgeType],
1650    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1651        if edge_types.is_empty() {
1652            return self.bfs_with_depth(start_entity_id, max_hops).await;
1653        }
1654        self.bfs_core_typed(start_entity_id, max_hops, None, edge_types)
1655            .await
1656    }
1657
1658    /// Shared BFS implementation.
1659    ///
1660    /// When `at_timestamp` is `None`, only active edges (`valid_to IS NULL`) are traversed.
1661    /// When `at_timestamp` is `Some(ts)`, edges valid at `ts` are traversed (temporal BFS).
1662    ///
1663    /// All IDs used in dynamic SQL come from our own database — no user input reaches the
1664    /// format string, so there is no SQL injection risk.
1665    async fn bfs_core(
1666        &self,
1667        start_entity_id: i64,
1668        max_hops: u32,
1669        at_timestamp: Option<&str>,
1670    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1671        use std::collections::HashMap;
1672
1673        // SQLite binds frontier IDs 3× per hop; at >333 IDs the IN clause exceeds
1674        // SQLITE_MAX_VARIABLE_NUMBER (999). Cap to 300 to stay safely within the limit.
1675        const MAX_FRONTIER: usize = 300;
1676
1677        let mut depth_map: HashMap<i64, u32> = HashMap::new();
1678        let mut frontier: Vec<i64> = vec![start_entity_id];
1679        depth_map.insert(start_entity_id, 0);
1680
1681        for hop in 0..max_hops {
1682            if frontier.is_empty() {
1683                break;
1684            }
1685            frontier.truncate(MAX_FRONTIER);
1686            // IDs come from our own DB — no user input, no injection risk.
1687            // Three copies of frontier IDs: positions 1..n, n+1..2n, 2n+1..3n.
1688            // Timestamp (if any) follows at position 3n+1.
1689            let n = frontier.len();
1690            let ph1 = placeholder_list(1, n);
1691            let ph2 = placeholder_list(n + 1, n);
1692            let ph3 = placeholder_list(n * 2 + 1, n);
1693            let edge_filter = if at_timestamp.is_some() {
1694                let ts_pos = n * 3 + 1;
1695                format!(
1696                    "valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1697                    ts = numbered_placeholder(ts_pos),
1698                )
1699            } else {
1700                "valid_to IS NULL".to_owned()
1701            };
1702            let neighbour_sql = format!(
1703                "SELECT DISTINCT CASE
1704                     WHEN source_entity_id IN ({ph1}) THEN target_entity_id
1705                     ELSE source_entity_id
1706                 END as neighbour_id
1707                 FROM graph_edges
1708                 WHERE {edge_filter}
1709                   AND (source_entity_id IN ({ph2}) OR target_entity_id IN ({ph3}))"
1710            );
1711            let mut q = zeph_db::query_scalar::<_, i64>(&neighbour_sql);
1712            for id in &frontier {
1713                q = q.bind(*id);
1714            }
1715            for id in &frontier {
1716                q = q.bind(*id);
1717            }
1718            for id in &frontier {
1719                q = q.bind(*id);
1720            }
1721            if let Some(ts) = at_timestamp {
1722                q = q.bind(ts);
1723            }
1724            let neighbours: Vec<i64> = q.fetch_all(&self.pool).await?;
1725            let mut next_frontier: Vec<i64> = Vec::new();
1726            for nbr in neighbours {
1727                if let std::collections::hash_map::Entry::Vacant(e) = depth_map.entry(nbr) {
1728                    e.insert(hop + 1);
1729                    next_frontier.push(nbr);
1730                }
1731            }
1732            frontier = next_frontier;
1733        }
1734
1735        self.bfs_fetch_results(depth_map, at_timestamp).await
1736    }
1737
1738    /// BFS implementation scoped to specific edge types.
1739    ///
1740    /// Builds the IN clause for `edge_type` filtering dynamically from enum values.
1741    /// All enum-derived strings come from `EdgeType::as_str()` — no user input reaches SQL.
1742    ///
1743    /// # Errors
1744    ///
1745    /// Returns an error if any database query fails.
1746    async fn bfs_core_typed(
1747        &self,
1748        start_entity_id: i64,
1749        max_hops: u32,
1750        at_timestamp: Option<&str>,
1751        edge_types: &[EdgeType],
1752    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1753        use std::collections::HashMap;
1754
1755        const MAX_FRONTIER: usize = 300;
1756
1757        let type_strs: Vec<&str> = edge_types.iter().map(|t| t.as_str()).collect();
1758
1759        let mut depth_map: HashMap<i64, u32> = HashMap::new();
1760        let mut frontier: Vec<i64> = vec![start_entity_id];
1761        depth_map.insert(start_entity_id, 0);
1762
1763        let n_types = type_strs.len();
1764        // type_in is constant for the entire BFS — positions 1..=n_types never change.
1765        let type_in = placeholder_list(1, n_types);
1766        let id_start = n_types + 1;
1767
1768        for hop in 0..max_hops {
1769            if frontier.is_empty() {
1770                break;
1771            }
1772            frontier.truncate(MAX_FRONTIER);
1773
1774            let n_frontier = frontier.len();
1775            // Positions: types first (1..n_types), then 3 copies of frontier IDs.
1776            let fp1 = placeholder_list(id_start, n_frontier);
1777            let fp2 = placeholder_list(id_start + n_frontier, n_frontier);
1778            let fp3 = placeholder_list(id_start + n_frontier * 2, n_frontier);
1779
1780            let edge_filter = if at_timestamp.is_some() {
1781                let ts_pos = id_start + n_frontier * 3;
1782                format!(
1783                    "edge_type IN ({type_in}) AND valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1784                    ts = numbered_placeholder(ts_pos),
1785                )
1786            } else {
1787                format!("edge_type IN ({type_in}) AND valid_to IS NULL")
1788            };
1789
1790            let neighbour_sql = format!(
1791                "SELECT DISTINCT CASE
1792                     WHEN source_entity_id IN ({fp1}) THEN target_entity_id
1793                     ELSE source_entity_id
1794                 END as neighbour_id
1795                 FROM graph_edges
1796                 WHERE {edge_filter}
1797                   AND (source_entity_id IN ({fp2}) OR target_entity_id IN ({fp3}))"
1798            );
1799
1800            let mut q = zeph_db::query_scalar::<_, i64>(&neighbour_sql);
1801            // Bind types first
1802            for t in &type_strs {
1803                q = q.bind(*t);
1804            }
1805            // Bind frontier 3 times
1806            for id in &frontier {
1807                q = q.bind(*id);
1808            }
1809            for id in &frontier {
1810                q = q.bind(*id);
1811            }
1812            for id in &frontier {
1813                q = q.bind(*id);
1814            }
1815            if let Some(ts) = at_timestamp {
1816                q = q.bind(ts);
1817            }
1818
1819            let neighbours: Vec<i64> = q.fetch_all(&self.pool).await?;
1820            let mut next_frontier: Vec<i64> = Vec::new();
1821            for nbr in neighbours {
1822                if let std::collections::hash_map::Entry::Vacant(e) = depth_map.entry(nbr) {
1823                    e.insert(hop + 1);
1824                    next_frontier.push(nbr);
1825                }
1826            }
1827            frontier = next_frontier;
1828        }
1829
1830        // Fetch results — pass edge_type filter to bfs_fetch_results_typed
1831        self.bfs_fetch_results_typed(depth_map, at_timestamp, &type_strs)
1832            .await
1833    }
1834
1835    /// Fetch entities and typed edges for a completed BFS depth map.
1836    ///
1837    /// Filters returned edges by the provided `edge_type` strings.
1838    ///
1839    /// # Errors
1840    ///
1841    /// Returns an error if any database query fails.
1842    async fn bfs_fetch_results_typed(
1843        &self,
1844        depth_map: std::collections::HashMap<i64, u32>,
1845        at_timestamp: Option<&str>,
1846        type_strs: &[&str],
1847    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1848        let mut visited_ids: Vec<i64> = depth_map.keys().copied().collect();
1849        if visited_ids.is_empty() {
1850            return Ok((Vec::new(), Vec::new(), depth_map));
1851        }
1852        if visited_ids.len() > 499 {
1853            tracing::warn!(
1854                total = visited_ids.len(),
1855                retained = 499,
1856                "bfs_fetch_results_typed: visited entity set truncated to 499"
1857            );
1858            visited_ids.truncate(499);
1859        }
1860
1861        let n_types = type_strs.len();
1862        let n_visited = visited_ids.len();
1863
1864        // Bind order: types first (1..=n_types), then visited_ids twice, then optional timestamp.
1865        let type_in = placeholder_list(1, n_types);
1866        let id_start = n_types + 1;
1867        let ph_ids1 = placeholder_list(id_start, n_visited);
1868        let ph_ids2 = placeholder_list(id_start + n_visited, n_visited);
1869
1870        let edge_filter = if at_timestamp.is_some() {
1871            let ts_pos = id_start + n_visited * 2;
1872            format!(
1873                "edge_type IN ({type_in}) AND valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1874                ts = numbered_placeholder(ts_pos),
1875            )
1876        } else {
1877            format!("edge_type IN ({type_in}) AND valid_to IS NULL")
1878        };
1879
1880        let edge_sql = format!(
1881            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1882                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1883                    edge_type, retrieval_count, last_retrieved_at, superseded_by
1884             FROM graph_edges
1885             WHERE {edge_filter}
1886               AND source_entity_id IN ({ph_ids1})
1887               AND target_entity_id IN ({ph_ids2})"
1888        );
1889        let mut edge_query = zeph_db::query_as::<_, EdgeRow>(&edge_sql);
1890        for t in type_strs {
1891            edge_query = edge_query.bind(*t);
1892        }
1893        for id in &visited_ids {
1894            edge_query = edge_query.bind(*id);
1895        }
1896        for id in &visited_ids {
1897            edge_query = edge_query.bind(*id);
1898        }
1899        if let Some(ts) = at_timestamp {
1900            edge_query = edge_query.bind(ts);
1901        }
1902        let edge_rows: Vec<EdgeRow> = edge_query.fetch_all(&self.pool).await?;
1903
1904        // For entity query, use plain sequential bind positions (no type prefix offset)
1905        let entity_sql2 = format!(
1906            "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
1907             FROM graph_entities WHERE id IN ({ph})",
1908            ph = placeholder_list(1, visited_ids.len()),
1909        );
1910        let mut entity_query = zeph_db::query_as::<_, EntityRow>(&entity_sql2);
1911        for id in &visited_ids {
1912            entity_query = entity_query.bind(*id);
1913        }
1914        let entity_rows: Vec<EntityRow> = entity_query.fetch_all(&self.pool).await?;
1915
1916        let entities: Vec<Entity> = entity_rows
1917            .into_iter()
1918            .map(entity_from_row)
1919            .collect::<Result<Vec<_>, _>>()?;
1920        let edges: Vec<Edge> = edge_rows.into_iter().map(edge_from_row).collect();
1921
1922        Ok((entities, edges, depth_map))
1923    }
1924
1925    /// Fetch entities and edges for a completed BFS depth map.
1926    async fn bfs_fetch_results(
1927        &self,
1928        depth_map: std::collections::HashMap<i64, u32>,
1929        at_timestamp: Option<&str>,
1930    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1931        let mut visited_ids: Vec<i64> = depth_map.keys().copied().collect();
1932        if visited_ids.is_empty() {
1933            return Ok((Vec::new(), Vec::new(), depth_map));
1934        }
1935        // Edge query binds visited_ids twice — cap at 499 to stay under SQLite 999 limit.
1936        if visited_ids.len() > 499 {
1937            tracing::warn!(
1938                total = visited_ids.len(),
1939                retained = 499,
1940                "bfs_fetch_results: visited entity set truncated to 499 to stay within SQLite bind limit; \
1941                 some reachable entities will be dropped from results"
1942            );
1943            visited_ids.truncate(499);
1944        }
1945
1946        let n = visited_ids.len();
1947        let ph_ids1 = placeholder_list(1, n);
1948        let ph_ids2 = placeholder_list(n + 1, n);
1949        let edge_filter = if at_timestamp.is_some() {
1950            let ts_pos = n * 2 + 1;
1951            format!(
1952                "valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1953                ts = numbered_placeholder(ts_pos),
1954            )
1955        } else {
1956            "valid_to IS NULL".to_owned()
1957        };
1958        let edge_sql = format!(
1959            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1960                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1961                    edge_type, retrieval_count, last_retrieved_at, superseded_by
1962             FROM graph_edges
1963             WHERE {edge_filter}
1964               AND source_entity_id IN ({ph_ids1})
1965               AND target_entity_id IN ({ph_ids2})"
1966        );
1967        let mut edge_query = zeph_db::query_as::<_, EdgeRow>(&edge_sql);
1968        for id in &visited_ids {
1969            edge_query = edge_query.bind(*id);
1970        }
1971        for id in &visited_ids {
1972            edge_query = edge_query.bind(*id);
1973        }
1974        if let Some(ts) = at_timestamp {
1975            edge_query = edge_query.bind(ts);
1976        }
1977        let edge_rows: Vec<EdgeRow> = edge_query.fetch_all(&self.pool).await?;
1978
1979        let entity_sql = format!(
1980            "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
1981             FROM graph_entities WHERE id IN ({ph})",
1982            ph = placeholder_list(1, n),
1983        );
1984        let mut entity_query = zeph_db::query_as::<_, EntityRow>(&entity_sql);
1985        for id in &visited_ids {
1986            entity_query = entity_query.bind(*id);
1987        }
1988        let entity_rows: Vec<EntityRow> = entity_query.fetch_all(&self.pool).await?;
1989
1990        let entities: Vec<Entity> = entity_rows
1991            .into_iter()
1992            .map(entity_from_row)
1993            .collect::<Result<Vec<_>, _>>()?;
1994        let edges: Vec<Edge> = edge_rows.into_iter().map(edge_from_row).collect();
1995
1996        Ok((entities, edges, depth_map))
1997    }
1998
1999    // ── Backfill helpers ──────────────────────────────────────────────────────
2000
2001    /// Find an entity by name only (no type filter).
2002    ///
2003    /// Uses a two-phase lookup to ensure exact name matches are always prioritised:
2004    /// 1. Exact case-insensitive match on `name` or `canonical_name`.
2005    /// 2. If no exact match found, falls back to FTS5 prefix search (see `find_entities_fuzzy`).
2006    ///
2007    /// This prevents FTS5 from returning a different entity whose *summary* mentions the
2008    /// searched name (e.g. searching "Alice" returning "Google" because Google's summary
2009    /// contains "Alice").
2010    ///
2011    /// # Errors
2012    ///
2013    /// Returns an error if the database query fails.
2014    pub async fn find_entity_by_name(&self, name: &str) -> Result<Vec<Entity>, MemoryError> {
2015        let find_by_name_sql = format!(
2016            "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id \
2017             FROM graph_entities \
2018             WHERE name = ? {cn} OR canonical_name = ? {cn} \
2019             LIMIT 5",
2020            cn = <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
2021        );
2022        let rows: Vec<EntityRow> = zeph_db::query_as(&find_by_name_sql)
2023            .bind(name)
2024            .bind(name)
2025            .fetch_all(&self.pool)
2026            .await?;
2027
2028        if !rows.is_empty() {
2029            return rows.into_iter().map(entity_from_row).collect();
2030        }
2031
2032        self.find_entities_fuzzy(name, 5).await
2033    }
2034
2035    /// Return up to `limit` messages that have not yet been processed by graph extraction.
2036    ///
2037    /// Reads the `graph_processed` column added by migration 021.
2038    ///
2039    /// # Errors
2040    ///
2041    /// Returns an error if the database query fails.
2042    pub async fn unprocessed_messages_for_backfill(
2043        &self,
2044        limit: usize,
2045    ) -> Result<Vec<(crate::types::MessageId, String)>, MemoryError> {
2046        let limit = i64::try_from(limit)?;
2047        let rows: Vec<(i64, String)> = zeph_db::query_as(sql!(
2048            "SELECT id, content FROM messages
2049             WHERE graph_processed = 0
2050             ORDER BY id ASC
2051             LIMIT ?"
2052        ))
2053        .bind(limit)
2054        .fetch_all(&self.pool)
2055        .await?;
2056        Ok(rows
2057            .into_iter()
2058            .map(|(id, content)| (crate::types::MessageId(id), content))
2059            .collect())
2060    }
2061
2062    /// Return the count of messages not yet processed by graph extraction.
2063    ///
2064    /// # Errors
2065    ///
2066    /// Returns an error if the database query fails.
2067    pub async fn unprocessed_message_count(&self) -> Result<i64, MemoryError> {
2068        let count: i64 = zeph_db::query_scalar(sql!(
2069            "SELECT COUNT(*) FROM messages WHERE graph_processed = 0"
2070        ))
2071        .fetch_one(&self.pool)
2072        .await?;
2073        Ok(count)
2074    }
2075
2076    /// Mark a batch of messages as graph-processed.
2077    ///
2078    /// # Errors
2079    ///
2080    /// Returns an error if the database query fails.
2081    pub async fn mark_messages_graph_processed(
2082        &self,
2083        ids: &[crate::types::MessageId],
2084    ) -> Result<(), MemoryError> {
2085        const MAX_BATCH: usize = 490;
2086        if ids.is_empty() {
2087            return Ok(());
2088        }
2089        for chunk in ids.chunks(MAX_BATCH) {
2090            let placeholders = placeholder_list(1, chunk.len());
2091            let sql =
2092                format!("UPDATE messages SET graph_processed = 1 WHERE id IN ({placeholders})");
2093            let mut query = zeph_db::query(&sql);
2094            for id in chunk {
2095                query = query.bind(id.0);
2096            }
2097            query.execute(&self.pool).await?;
2098        }
2099        Ok(())
2100    }
2101}
2102
2103// ── Dialect helpers ───────────────────────────────────────────────────────────
2104
2105#[cfg(feature = "sqlite")]
2106fn community_ids_sql(placeholders: &str) -> String {
2107    format!(
2108        "SELECT CAST(j.value AS INTEGER) AS entity_id, c.id AS community_id
2109         FROM graph_communities c, json_each(c.entity_ids) j
2110         WHERE CAST(j.value AS INTEGER) IN ({placeholders})"
2111    )
2112}
2113
2114#[cfg(feature = "postgres")]
2115fn community_ids_sql(placeholders: &str) -> String {
2116    format!(
2117        "SELECT (j.value)::bigint AS entity_id, c.id AS community_id
2118         FROM graph_communities c,
2119              jsonb_array_elements_text(c.entity_ids::jsonb) j(value)
2120         WHERE (j.value)::bigint IN ({placeholders})"
2121    )
2122}
2123
2124// ── Row types for zeph_db::query_as ─────────────────────────────────────────────
2125
2126#[derive(zeph_db::FromRow)]
2127struct EntityRow {
2128    id: i64,
2129    name: String,
2130    canonical_name: String,
2131    entity_type: String,
2132    summary: Option<String>,
2133    first_seen_at: String,
2134    last_seen_at: String,
2135    qdrant_point_id: Option<String>,
2136}
2137
2138fn entity_from_row(row: EntityRow) -> Result<Entity, MemoryError> {
2139    let entity_type = row
2140        .entity_type
2141        .parse::<EntityType>()
2142        .map_err(MemoryError::GraphStore)?;
2143    Ok(Entity {
2144        id: row.id,
2145        name: row.name,
2146        canonical_name: row.canonical_name,
2147        entity_type,
2148        summary: row.summary,
2149        first_seen_at: row.first_seen_at,
2150        last_seen_at: row.last_seen_at,
2151        qdrant_point_id: row.qdrant_point_id,
2152    })
2153}
2154
2155#[derive(zeph_db::FromRow)]
2156struct AliasRow {
2157    id: i64,
2158    entity_id: i64,
2159    alias_name: String,
2160    created_at: String,
2161}
2162
2163fn alias_from_row(row: AliasRow) -> EntityAlias {
2164    EntityAlias {
2165        id: row.id,
2166        entity_id: row.entity_id,
2167        alias_name: row.alias_name,
2168        created_at: row.created_at,
2169    }
2170}
2171
2172#[derive(zeph_db::FromRow)]
2173struct EdgeRow {
2174    id: i64,
2175    source_entity_id: i64,
2176    target_entity_id: i64,
2177    relation: String,
2178    fact: String,
2179    confidence: f64,
2180    valid_from: String,
2181    valid_to: Option<String>,
2182    created_at: String,
2183    expired_at: Option<String>,
2184    #[sqlx(rename = "episode_id")]
2185    source_message_id: Option<i64>,
2186    qdrant_point_id: Option<String>,
2187    edge_type: String,
2188    retrieval_count: i32,
2189    last_retrieved_at: Option<i64>,
2190    superseded_by: Option<i64>,
2191}
2192
2193fn edge_from_row(row: EdgeRow) -> Edge {
2194    let edge_type = row
2195        .edge_type
2196        .parse::<EdgeType>()
2197        .unwrap_or(EdgeType::Semantic);
2198    Edge {
2199        id: row.id,
2200        source_entity_id: row.source_entity_id,
2201        target_entity_id: row.target_entity_id,
2202        relation: row.relation,
2203        fact: row.fact,
2204        #[allow(clippy::cast_possible_truncation)]
2205        confidence: row.confidence as f32,
2206        valid_from: row.valid_from,
2207        valid_to: row.valid_to,
2208        created_at: row.created_at,
2209        expired_at: row.expired_at,
2210        source_message_id: row.source_message_id.map(MessageId),
2211        qdrant_point_id: row.qdrant_point_id,
2212        edge_type,
2213        retrieval_count: row.retrieval_count,
2214        last_retrieved_at: row.last_retrieved_at,
2215        superseded_by: row.superseded_by,
2216    }
2217}
2218
2219#[derive(zeph_db::FromRow)]
2220struct CommunityRow {
2221    id: i64,
2222    name: String,
2223    summary: String,
2224    entity_ids: String,
2225    fingerprint: Option<String>,
2226    created_at: String,
2227    updated_at: String,
2228}
2229
2230// ── GAAMA Episode methods ──────────────────────────────────────────────────────
2231
2232impl GraphStore {
2233    /// Ensure a GAAMA episode exists for this conversation, returning its ID.
2234    ///
2235    /// Idempotent: inserts on first call, returns existing ID on subsequent calls.
2236    ///
2237    /// # Errors
2238    ///
2239    /// Returns an error if the database query fails.
2240    pub async fn ensure_episode(&self, conversation_id: i64) -> Result<i64, MemoryError> {
2241        // Ensure the conversation row exists before inserting into graph_episodes,
2242        // which has a FK referencing conversations(id). On a fresh database the agent
2243        // may run graph extraction before the conversation row is committed.
2244        zeph_db::query(sql!("INSERT OR IGNORE INTO conversations (id) VALUES (?)"))
2245            .bind(conversation_id)
2246            .execute(&self.pool)
2247            .await?;
2248
2249        let id: i64 = zeph_db::query_scalar(sql!(
2250            "INSERT INTO graph_episodes (conversation_id)
2251             VALUES (?)
2252             ON CONFLICT(conversation_id) DO UPDATE SET conversation_id = excluded.conversation_id
2253             RETURNING id"
2254        ))
2255        .bind(conversation_id)
2256        .fetch_one(&self.pool)
2257        .await?;
2258        Ok(id)
2259    }
2260
2261    /// Record that an entity was observed in an episode.
2262    ///
2263    /// Idempotent: does nothing if the link already exists.
2264    ///
2265    /// # Errors
2266    ///
2267    /// Returns an error if the database query fails.
2268    pub async fn link_entity_to_episode(
2269        &self,
2270        episode_id: i64,
2271        entity_id: i64,
2272    ) -> Result<(), MemoryError> {
2273        zeph_db::query(sql!(
2274            "INSERT OR IGNORE INTO graph_episode_entities (episode_id, entity_id)
2275             VALUES (?, ?)"
2276        ))
2277        .bind(episode_id)
2278        .bind(entity_id)
2279        .execute(&self.pool)
2280        .await?;
2281        Ok(())
2282    }
2283
2284    /// Return all episodes in which an entity appears.
2285    ///
2286    /// # Errors
2287    ///
2288    /// Returns an error if the database query fails.
2289    pub async fn episodes_for_entity(
2290        &self,
2291        entity_id: i64,
2292    ) -> Result<Vec<super::types::Episode>, MemoryError> {
2293        #[derive(zeph_db::FromRow)]
2294        struct EpisodeRow {
2295            id: i64,
2296            conversation_id: i64,
2297            created_at: String,
2298            closed_at: Option<String>,
2299        }
2300        let rows: Vec<EpisodeRow> = zeph_db::query_as(sql!(
2301            "SELECT e.id, e.conversation_id, e.created_at, e.closed_at
2302             FROM graph_episodes e
2303             JOIN graph_episode_entities ee ON ee.episode_id = e.id
2304             WHERE ee.entity_id = ?"
2305        ))
2306        .bind(entity_id)
2307        .fetch_all(&self.pool)
2308        .await?;
2309        Ok(rows
2310            .into_iter()
2311            .map(|r| super::types::Episode {
2312                id: r.id,
2313                conversation_id: r.conversation_id,
2314                created_at: r.created_at,
2315                closed_at: r.closed_at,
2316            })
2317            .collect())
2318    }
2319}
2320
2321// ── Tests ─────────────────────────────────────────────────────────────────────
2322
2323#[cfg(test)]
2324mod tests;