Skip to main content

zeph_memory/graph/store/
mod.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use std::collections::HashMap;
5#[allow(unused_imports)]
6use zeph_db::sql;
7
8use futures::Stream;
9use zeph_db::fts::sanitize_fts_query;
10use zeph_db::{ActiveDialect, DbPool, numbered_placeholder, placeholder_list};
11
12use crate::error::MemoryError;
13use crate::types::MessageId;
14
15use super::types::{Community, Edge, EdgeType, Entity, EntityAlias, EntityType};
16
17pub struct GraphStore {
18    pool: DbPool,
19}
20
21impl GraphStore {
22    #[must_use]
23    pub fn new(pool: DbPool) -> Self {
24        Self { pool }
25    }
26
27    #[must_use]
28    pub fn pool(&self) -> &DbPool {
29        &self.pool
30    }
31
32    // ── Entities ─────────────────────────────────────────────────────────────
33
34    /// Insert or update an entity by `(canonical_name, entity_type)`.
35    ///
36    /// - `surface_name`: the original display form (e.g. `"Rust"`) — stored in the `name` column
37    ///   so user-facing output preserves casing. Updated on every upsert to the latest seen form.
38    /// - `canonical_name`: the stable normalized key (e.g. `"rust"`) — used for deduplication.
39    /// - `summary`: pass `None` to preserve the existing summary; pass `Some("")` to blank it.
40    ///
41    /// # Errors
42    ///
43    /// Returns an error if the database query fails.
44    pub async fn upsert_entity(
45        &self,
46        surface_name: &str,
47        canonical_name: &str,
48        entity_type: EntityType,
49        summary: Option<&str>,
50    ) -> Result<i64, MemoryError> {
51        let type_str = entity_type.as_str();
52        let id: i64 = zeph_db::query_scalar(sql!(
53            "INSERT INTO graph_entities (name, canonical_name, entity_type, summary)
54             VALUES (?, ?, ?, ?)
55             ON CONFLICT(canonical_name, entity_type) DO UPDATE SET
56               name = excluded.name,
57               summary = COALESCE(excluded.summary, summary),
58               last_seen_at = CURRENT_TIMESTAMP
59             RETURNING id"
60        ))
61        .bind(surface_name)
62        .bind(canonical_name)
63        .bind(type_str)
64        .bind(summary)
65        .fetch_one(&self.pool)
66        .await?;
67        Ok(id)
68    }
69
70    /// Find an entity by exact canonical name and type.
71    ///
72    /// # Errors
73    ///
74    /// Returns an error if the database query fails.
75    pub async fn find_entity(
76        &self,
77        canonical_name: &str,
78        entity_type: EntityType,
79    ) -> Result<Option<Entity>, MemoryError> {
80        let type_str = entity_type.as_str();
81        let row: Option<EntityRow> = zeph_db::query_as(
82            sql!("SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
83             FROM graph_entities
84             WHERE canonical_name = ? AND entity_type = ?"),
85        )
86        .bind(canonical_name)
87        .bind(type_str)
88        .fetch_optional(&self.pool)
89        .await?;
90        row.map(entity_from_row).transpose()
91    }
92
93    /// Find an entity by its numeric ID.
94    ///
95    /// # Errors
96    ///
97    /// Returns an error if the database query fails.
98    pub async fn find_entity_by_id(&self, entity_id: i64) -> Result<Option<Entity>, MemoryError> {
99        let row: Option<EntityRow> = zeph_db::query_as(
100            sql!("SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
101             FROM graph_entities
102             WHERE id = ?"),
103        )
104        .bind(entity_id)
105        .fetch_optional(&self.pool)
106        .await?;
107        row.map(entity_from_row).transpose()
108    }
109
110    /// Update the `qdrant_point_id` for an entity.
111    ///
112    /// # Errors
113    ///
114    /// Returns an error if the database query fails.
115    pub async fn set_entity_qdrant_point_id(
116        &self,
117        entity_id: i64,
118        point_id: &str,
119    ) -> Result<(), MemoryError> {
120        zeph_db::query(sql!(
121            "UPDATE graph_entities SET qdrant_point_id = ? WHERE id = ?"
122        ))
123        .bind(point_id)
124        .bind(entity_id)
125        .execute(&self.pool)
126        .await?;
127        Ok(())
128    }
129
130    /// Find entities matching `query` in name, summary, or aliases, up to `limit` results, ranked by relevance.
131    ///
132    /// Uses FTS5 MATCH with prefix wildcards (`token*`) and bm25 ranking. Name matches are
133    /// weighted 10x higher than summary matches. Also searches `graph_entity_aliases` for
134    /// alias matches via a UNION query.
135    ///
136    /// # Behavioral note
137    ///
138    /// This replaces the previous `LIKE '%query%'` implementation. FTS5 prefix matching differs
139    /// from substring matching: searching "SQL" will match "`SQLite`" (prefix) but NOT "`GraphQL`"
140    /// (substring). Entity names are indexed as single tokens by the unicode61 tokenizer, so
141    /// mid-word substrings are not matched. This is a known trade-off for index performance.
142    ///
143    /// Single-character queries (e.g., "a") are allowed and produce a broad prefix match ("a*").
144    /// The `limit` parameter caps the result set. No minimum query length is enforced; if this
145    /// causes noise in practice, add a minimum length guard at the call site.
146    ///
147    /// # Errors
148    ///
149    /// Returns an error if the database query fails.
150    pub async fn find_entities_fuzzy(
151        &self,
152        query: &str,
153        limit: usize,
154    ) -> Result<Vec<Entity>, MemoryError> {
155        // FTS5 boolean operator keywords (case-sensitive uppercase). Filtering these
156        // prevents syntax errors when user input contains them as literal search terms
157        // (e.g., "graph OR unrelated" must not produce "graph* OR* unrelated*").
158        const FTS5_OPERATORS: &[&str] = &["AND", "OR", "NOT", "NEAR"];
159        let query = &query[..query.floor_char_boundary(512)];
160        // Sanitize input: split on non-alphanumeric characters, filter empty tokens,
161        // append '*' to each token for FTS5 prefix matching ("graph" -> "graph*").
162        let sanitized = sanitize_fts_query(query);
163        if sanitized.is_empty() {
164            return Ok(vec![]);
165        }
166        let fts_query: String = sanitized
167            .split_whitespace()
168            .filter(|t| !FTS5_OPERATORS.contains(t))
169            .map(|t| format!("{t}*"))
170            .collect::<Vec<_>>()
171            .join(" ");
172        if fts_query.is_empty() {
173            return Ok(vec![]);
174        }
175
176        let limit = i64::try_from(limit)?;
177        // bm25(graph_entities_fts, 10.0, 1.0): name column weighted 10x over summary.
178        // bm25() returns negative values; ORDER BY ASC puts best matches first.
179        let search_sql = format!(
180            "SELECT DISTINCT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
181                    e.first_seen_at, e.last_seen_at, e.qdrant_point_id \
182             FROM graph_entities_fts fts \
183             JOIN graph_entities e ON e.id = fts.rowid \
184             WHERE graph_entities_fts MATCH ? \
185             UNION \
186             SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
187                    e.first_seen_at, e.last_seen_at, e.qdrant_point_id \
188             FROM graph_entity_aliases a \
189             JOIN graph_entities e ON e.id = a.entity_id \
190             WHERE a.alias_name LIKE ? ESCAPE '\\' {} \
191             LIMIT ?",
192            <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
193        );
194        let rows: Vec<EntityRow> = zeph_db::query_as(&search_sql)
195            .bind(&fts_query)
196            .bind(format!(
197                "%{}%",
198                query
199                    .trim()
200                    .replace('\\', "\\\\")
201                    .replace('%', "\\%")
202                    .replace('_', "\\_")
203            ))
204            .bind(limit)
205            .fetch_all(&self.pool)
206            .await?;
207        rows.into_iter()
208            .map(entity_from_row)
209            .collect::<Result<Vec<_>, _>>()
210    }
211
212    /// Flush the `SQLite` WAL to the main database file.
213    ///
214    /// Runs `PRAGMA wal_checkpoint(PASSIVE)`. Safe to call at any time; does not block active
215    /// readers or writers. Call after bulk entity inserts to ensure FTS5 shadow table writes are
216    /// visible to connections opened in future sessions.
217    ///
218    /// # Errors
219    ///
220    /// Returns an error if the PRAGMA execution fails.
221    #[cfg(feature = "sqlite")]
222    pub async fn checkpoint_wal(&self) -> Result<(), MemoryError> {
223        zeph_db::query("PRAGMA wal_checkpoint(PASSIVE)")
224            .execute(&self.pool)
225            .await?;
226        Ok(())
227    }
228
229    /// No-op on `PostgreSQL` (WAL management is handled by the server).
230    ///
231    /// # Errors
232    ///
233    /// Never returns an error.
234    #[cfg(feature = "postgres")]
235    pub async fn checkpoint_wal(&self) -> Result<(), MemoryError> {
236        Ok(())
237    }
238
239    /// Stream all entities from the database incrementally (true cursor, no full-table load).
240    pub fn all_entities_stream(&self) -> impl Stream<Item = Result<Entity, MemoryError>> + '_ {
241        use futures::StreamExt as _;
242        zeph_db::query_as::<_, EntityRow>(
243            sql!("SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
244             FROM graph_entities ORDER BY id ASC"),
245        )
246        .fetch(&self.pool)
247        .map(|r: Result<EntityRow, zeph_db::SqlxError>| {
248            r.map_err(MemoryError::from).and_then(entity_from_row)
249        })
250    }
251
252    // ── Alias methods ─────────────────────────────────────────────────────────
253
254    /// Insert an alias for an entity (idempotent: duplicate alias is silently ignored via UNIQUE constraint).
255    ///
256    /// # Errors
257    ///
258    /// Returns an error if the database query fails.
259    pub async fn add_alias(&self, entity_id: i64, alias_name: &str) -> Result<(), MemoryError> {
260        let insert_alias_sql = format!(
261            "{} INTO graph_entity_aliases (entity_id, alias_name) VALUES (?, ?){}",
262            <ActiveDialect as zeph_db::dialect::Dialect>::INSERT_IGNORE,
263            <ActiveDialect as zeph_db::dialect::Dialect>::CONFLICT_NOTHING,
264        );
265        zeph_db::query(&insert_alias_sql)
266            .bind(entity_id)
267            .bind(alias_name)
268            .execute(&self.pool)
269            .await?;
270        Ok(())
271    }
272
273    /// Find an entity by alias name and entity type (case-insensitive).
274    ///
275    /// Filters by `entity_type` to avoid cross-type alias collisions (S2 fix).
276    ///
277    /// # Errors
278    ///
279    /// Returns an error if the database query fails.
280    pub async fn find_entity_by_alias(
281        &self,
282        alias_name: &str,
283        entity_type: EntityType,
284    ) -> Result<Option<Entity>, MemoryError> {
285        let type_str = entity_type.as_str();
286        let alias_typed_sql = format!(
287            "SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
288                    e.first_seen_at, e.last_seen_at, e.qdrant_point_id \
289             FROM graph_entity_aliases a \
290             JOIN graph_entities e ON e.id = a.entity_id \
291             WHERE a.alias_name = ? {} \
292               AND e.entity_type = ? \
293             ORDER BY e.id ASC \
294             LIMIT 1",
295            <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
296        );
297        let row: Option<EntityRow> = zeph_db::query_as(&alias_typed_sql)
298            .bind(alias_name)
299            .bind(type_str)
300            .fetch_optional(&self.pool)
301            .await?;
302        row.map(entity_from_row).transpose()
303    }
304
305    /// Get all aliases for an entity.
306    ///
307    /// # Errors
308    ///
309    /// Returns an error if the database query fails.
310    pub async fn aliases_for_entity(
311        &self,
312        entity_id: i64,
313    ) -> Result<Vec<EntityAlias>, MemoryError> {
314        let rows: Vec<AliasRow> = zeph_db::query_as(sql!(
315            "SELECT id, entity_id, alias_name, created_at
316             FROM graph_entity_aliases
317             WHERE entity_id = ?
318             ORDER BY id ASC"
319        ))
320        .bind(entity_id)
321        .fetch_all(&self.pool)
322        .await?;
323        Ok(rows.into_iter().map(alias_from_row).collect())
324    }
325
326    /// Collect all entities into a Vec.
327    ///
328    /// # Errors
329    ///
330    /// Returns an error if the database query fails or `entity_type` parsing fails.
331    pub async fn all_entities(&self) -> Result<Vec<Entity>, MemoryError> {
332        use futures::TryStreamExt as _;
333        self.all_entities_stream().try_collect().await
334    }
335
336    /// Count the total number of entities.
337    ///
338    /// # Errors
339    ///
340    /// Returns an error if the database query fails.
341    pub async fn entity_count(&self) -> Result<i64, MemoryError> {
342        let count: i64 = zeph_db::query_scalar(sql!("SELECT COUNT(*) FROM graph_entities"))
343            .fetch_one(&self.pool)
344            .await?;
345        Ok(count)
346    }
347
348    // ── Edges ─────────────────────────────────────────────────────────────────
349
350    /// Insert a new edge between two entities, or update the existing active edge.
351    ///
352    /// An active edge is identified by `(source_entity_id, target_entity_id, relation, edge_type)`
353    /// with `valid_to IS NULL`. If such an edge already exists, its `confidence` is updated to the
354    /// maximum of the stored and incoming values, and the existing id is returned. This prevents
355    /// duplicate edges from repeated extraction of the same context messages.
356    ///
357    /// The dedup key includes `edge_type` (critic mitigation): the same `(source, target, relation)`
358    /// triple can legitimately exist with different edge types (e.g., `depends_on` can be both
359    /// Semantic and Causal). Without `edge_type` in the key, the second insertion would silently
360    /// update the first and lose the type classification.
361    ///
362    /// # Errors
363    ///
364    /// Returns an error if the database query fails.
365    pub async fn insert_edge(
366        &self,
367        source_entity_id: i64,
368        target_entity_id: i64,
369        relation: &str,
370        fact: &str,
371        confidence: f32,
372        episode_id: Option<MessageId>,
373    ) -> Result<i64, MemoryError> {
374        self.insert_edge_typed(
375            source_entity_id,
376            target_entity_id,
377            relation,
378            fact,
379            confidence,
380            episode_id,
381            EdgeType::Semantic,
382        )
383        .await
384    }
385
386    /// Insert a typed edge between two entities, or update the existing active edge of the same type.
387    ///
388    /// Identical semantics to [`insert_edge`] but with an explicit `edge_type` parameter.
389    /// The dedup key is `(source_entity_id, target_entity_id, relation, edge_type, valid_to IS NULL)`.
390    ///
391    /// # Errors
392    ///
393    /// Returns an error if the database query fails.
394    #[allow(clippy::too_many_arguments)]
395    pub async fn insert_edge_typed(
396        &self,
397        source_entity_id: i64,
398        target_entity_id: i64,
399        relation: &str,
400        fact: &str,
401        confidence: f32,
402        episode_id: Option<MessageId>,
403        edge_type: EdgeType,
404    ) -> Result<i64, MemoryError> {
405        if source_entity_id == target_entity_id {
406            return Err(MemoryError::InvalidInput(format!(
407                "self-loop edge rejected: source and target are the same entity (id={source_entity_id})"
408            )));
409        }
410        let confidence = confidence.clamp(0.0, 1.0);
411        let edge_type_str = edge_type.as_str();
412
413        // Wrap SELECT + INSERT/UPDATE in a single transaction to eliminate the race window
414        // between existence check and write. The unique partial index uq_graph_edges_active
415        // covers (source, target, relation, edge_type) WHERE valid_to IS NULL; SQLite does not
416        // support ON CONFLICT DO UPDATE against partial indexes, so we keep two statements.
417        let mut tx = zeph_db::begin(&self.pool).await?;
418
419        let existing: Option<(i64, f64)> = zeph_db::query_as(sql!(
420            "SELECT id, confidence FROM graph_edges
421             WHERE source_entity_id = ?
422               AND target_entity_id = ?
423               AND relation = ?
424               AND edge_type = ?
425               AND valid_to IS NULL
426             LIMIT 1"
427        ))
428        .bind(source_entity_id)
429        .bind(target_entity_id)
430        .bind(relation)
431        .bind(edge_type_str)
432        .fetch_optional(&mut *tx)
433        .await?;
434
435        if let Some((existing_id, stored_conf)) = existing {
436            let updated_conf = f64::from(confidence).max(stored_conf);
437            zeph_db::query(sql!("UPDATE graph_edges SET confidence = ? WHERE id = ?"))
438                .bind(updated_conf)
439                .bind(existing_id)
440                .execute(&mut *tx)
441                .await?;
442            tx.commit().await?;
443            return Ok(existing_id);
444        }
445
446        let episode_raw: Option<i64> = episode_id.map(|m| m.0);
447        let id: i64 = zeph_db::query_scalar(sql!(
448            "INSERT INTO graph_edges
449             (source_entity_id, target_entity_id, relation, fact, confidence, episode_id, edge_type)
450             VALUES (?, ?, ?, ?, ?, ?, ?)
451             RETURNING id"
452        ))
453        .bind(source_entity_id)
454        .bind(target_entity_id)
455        .bind(relation)
456        .bind(fact)
457        .bind(f64::from(confidence))
458        .bind(episode_raw)
459        .bind(edge_type_str)
460        .fetch_one(&mut *tx)
461        .await?;
462        tx.commit().await?;
463        Ok(id)
464    }
465
466    /// Mark an edge as invalid (set `valid_to` and `expired_at` to now).
467    ///
468    /// # Errors
469    ///
470    /// Returns an error if the database update fails.
471    pub async fn invalidate_edge(&self, edge_id: i64) -> Result<(), MemoryError> {
472        zeph_db::query(sql!(
473            "UPDATE graph_edges SET valid_to = CURRENT_TIMESTAMP, expired_at = CURRENT_TIMESTAMP
474             WHERE id = ?"
475        ))
476        .bind(edge_id)
477        .execute(&self.pool)
478        .await?;
479        Ok(())
480    }
481
482    /// Invalidate an edge and record the supersession pointer for Kumiho belief revision audit trail.
483    ///
484    /// Sets `valid_to`, `expired_at`, and `superseded_by` on the old edge to link it to its replacement.
485    ///
486    /// # Errors
487    ///
488    /// Returns an error if the database update fails.
489    pub async fn invalidate_edge_with_supersession(
490        &self,
491        old_edge_id: i64,
492        new_edge_id: i64,
493    ) -> Result<(), MemoryError> {
494        zeph_db::query(sql!(
495            "UPDATE graph_edges
496             SET valid_to = CURRENT_TIMESTAMP,
497                 expired_at = CURRENT_TIMESTAMP,
498                 superseded_by = ?
499             WHERE id = ?"
500        ))
501        .bind(new_edge_id)
502        .bind(old_edge_id)
503        .execute(&self.pool)
504        .await?;
505        Ok(())
506    }
507
508    /// Get all active edges for a batch of entity IDs, with optional MAGMA edge type filtering.
509    ///
510    /// Fetches all currently-active edges (`valid_to IS NULL`) where either endpoint
511    /// is in `entity_ids`. Traversal is always current-time only (no `at_timestamp` support
512    /// in v1 — see `bfs_at_timestamp` for historical traversal).
513    ///
514    /// # `SQLite` bind limit safety
515    ///
516    /// `SQLite` limits the number of bind parameters to `SQLITE_MAX_VARIABLE_NUMBER` (999 by
517    /// default). Each entity ID requires two bind slots (source OR target), so batches are
518    /// chunked at `MAX_BATCH_ENTITIES = 490` to stay safely under the limit regardless of
519    /// compile-time `SQLite` configuration.
520    ///
521    /// # Errors
522    ///
523    /// Returns an error if the database query fails.
524    pub async fn edges_for_entities(
525        &self,
526        entity_ids: &[i64],
527        edge_types: &[super::types::EdgeType],
528    ) -> Result<Vec<Edge>, MemoryError> {
529        // Safe margin under SQLite SQLITE_MAX_VARIABLE_NUMBER (999):
530        // each entity ID uses 2 bind slots (source_entity_id OR target_entity_id).
531        // 490 * 2 = 980, leaving headroom for future query additions.
532        const MAX_BATCH_ENTITIES: usize = 490;
533
534        let mut all_edges: Vec<Edge> = Vec::new();
535
536        for chunk in entity_ids.chunks(MAX_BATCH_ENTITIES) {
537            let edges = self.query_batch_edges(chunk, edge_types).await?;
538            all_edges.extend(edges);
539        }
540
541        Ok(all_edges)
542    }
543
544    /// Query active edges for a single chunk of entity IDs (internal helper).
545    ///
546    /// Caller is responsible for ensuring `entity_ids.len() <= MAX_BATCH_ENTITIES`.
547    async fn query_batch_edges(
548        &self,
549        entity_ids: &[i64],
550        edge_types: &[super::types::EdgeType],
551    ) -> Result<Vec<Edge>, MemoryError> {
552        if entity_ids.is_empty() {
553            return Ok(Vec::new());
554        }
555
556        // Build a parameterized IN clause with backend-appropriate placeholders.
557        // We cannot use the sql! macro here because the placeholder count is dynamic.
558        let n_ids = entity_ids.len();
559        let n_types = edge_types.len();
560
561        let sql = if n_types == 0 {
562            // placeholders used twice (source IN and target IN)
563            let placeholders = placeholder_list(1, n_ids);
564            let placeholders2 = placeholder_list(n_ids + 1, n_ids);
565            format!(
566                "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
567                        valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
568                        edge_type, retrieval_count, last_retrieved_at, superseded_by, superseded_by
569                 FROM graph_edges
570                 WHERE valid_to IS NULL
571                   AND (source_entity_id IN ({placeholders}) OR target_entity_id IN ({placeholders2}))"
572            )
573        } else {
574            let placeholders = placeholder_list(1, n_ids);
575            let placeholders2 = placeholder_list(n_ids + 1, n_ids);
576            let type_placeholders = placeholder_list(n_ids * 2 + 1, n_types);
577            format!(
578                "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
579                        valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
580                        edge_type, retrieval_count, last_retrieved_at, superseded_by, superseded_by
581                 FROM graph_edges
582                 WHERE valid_to IS NULL
583                   AND (source_entity_id IN ({placeholders}) OR target_entity_id IN ({placeholders2}))
584                   AND edge_type IN ({type_placeholders})"
585            )
586        };
587
588        // Bind entity IDs twice (source IN and target IN clauses) then edge types.
589        let mut query = zeph_db::query_as::<_, EdgeRow>(&sql);
590        for id in entity_ids {
591            query = query.bind(*id);
592        }
593        for id in entity_ids {
594            query = query.bind(*id);
595        }
596        for et in edge_types {
597            query = query.bind(et.as_str());
598        }
599
600        // Wrap pool.acquire() + query execution in a short timeout to prevent the outer
601        // tokio::time::timeout (in SemanticMemory recall) from cancelling a mid-acquire
602        // future, which causes sqlx 0.8 semaphore count drift and permanent pool starvation.
603        let rows: Vec<EdgeRow> = tokio::time::timeout(
604            std::time::Duration::from_millis(500),
605            query.fetch_all(&self.pool),
606        )
607        .await
608        .map_err(|_| MemoryError::Timeout("graph pool acquire timed out after 500ms".into()))??;
609        Ok(rows.into_iter().map(edge_from_row).collect())
610    }
611
612    /// Get all active edges where entity is source or target.
613    ///
614    /// # Errors
615    ///
616    /// Returns an error if the database query fails.
617    pub async fn edges_for_entity(&self, entity_id: i64) -> Result<Vec<Edge>, MemoryError> {
618        let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
619            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
620                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
621                    edge_type, retrieval_count, last_retrieved_at, superseded_by
622             FROM graph_edges
623             WHERE valid_to IS NULL
624               AND (source_entity_id = ? OR target_entity_id = ?)"
625        ))
626        .bind(entity_id)
627        .bind(entity_id)
628        .fetch_all(&self.pool)
629        .await?;
630        Ok(rows.into_iter().map(edge_from_row).collect())
631    }
632
633    /// Get all edges (active and expired) where entity is source or target, ordered by
634    /// `valid_from DESC`. Used by the `/graph history <name>` slash command.
635    ///
636    /// # Errors
637    ///
638    /// Returns an error if the database query fails or if `limit` overflows `i64`.
639    pub async fn edge_history_for_entity(
640        &self,
641        entity_id: i64,
642        limit: usize,
643    ) -> Result<Vec<Edge>, MemoryError> {
644        let limit = i64::try_from(limit)?;
645        let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
646            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
647                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
648                    edge_type, retrieval_count, last_retrieved_at, superseded_by
649             FROM graph_edges
650             WHERE source_entity_id = ? OR target_entity_id = ?
651             ORDER BY valid_from DESC
652             LIMIT ?"
653        ))
654        .bind(entity_id)
655        .bind(entity_id)
656        .bind(limit)
657        .fetch_all(&self.pool)
658        .await?;
659        Ok(rows.into_iter().map(edge_from_row).collect())
660    }
661
662    /// Get all active edges between two entities (both directions).
663    ///
664    /// # Errors
665    ///
666    /// Returns an error if the database query fails.
667    pub async fn edges_between(
668        &self,
669        entity_a: i64,
670        entity_b: i64,
671    ) -> Result<Vec<Edge>, MemoryError> {
672        let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
673            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
674                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
675                    edge_type, retrieval_count, last_retrieved_at, superseded_by
676             FROM graph_edges
677             WHERE valid_to IS NULL
678               AND ((source_entity_id = ? AND target_entity_id = ?)
679                 OR (source_entity_id = ? AND target_entity_id = ?))"
680        ))
681        .bind(entity_a)
682        .bind(entity_b)
683        .bind(entity_b)
684        .bind(entity_a)
685        .fetch_all(&self.pool)
686        .await?;
687        Ok(rows.into_iter().map(edge_from_row).collect())
688    }
689
690    /// Get active edges from `source` to `target` in the exact direction (no reverse).
691    ///
692    /// # Errors
693    ///
694    /// Returns an error if the database query fails.
695    pub async fn edges_exact(
696        &self,
697        source_entity_id: i64,
698        target_entity_id: i64,
699    ) -> Result<Vec<Edge>, MemoryError> {
700        let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
701            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
702                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
703                    edge_type, retrieval_count, last_retrieved_at, superseded_by
704             FROM graph_edges
705             WHERE valid_to IS NULL
706               AND source_entity_id = ?
707               AND target_entity_id = ?"
708        ))
709        .bind(source_entity_id)
710        .bind(target_entity_id)
711        .fetch_all(&self.pool)
712        .await?;
713        Ok(rows.into_iter().map(edge_from_row).collect())
714    }
715
716    /// Count active (non-invalidated) edges.
717    ///
718    /// # Errors
719    ///
720    /// Returns an error if the database query fails.
721    pub async fn active_edge_count(&self) -> Result<i64, MemoryError> {
722        let count: i64 = zeph_db::query_scalar(sql!(
723            "SELECT COUNT(*) FROM graph_edges WHERE valid_to IS NULL"
724        ))
725        .fetch_one(&self.pool)
726        .await?;
727        Ok(count)
728    }
729
730    /// Return per-type active edge counts as `(edge_type, count)` pairs.
731    ///
732    /// # Errors
733    ///
734    /// Returns an error if the database query fails.
735    pub async fn edge_type_distribution(&self) -> Result<Vec<(String, i64)>, MemoryError> {
736        let rows: Vec<(String, i64)> = zeph_db::query_as(
737            sql!("SELECT edge_type, COUNT(*) FROM graph_edges WHERE valid_to IS NULL GROUP BY edge_type ORDER BY edge_type"),
738        )
739        .fetch_all(&self.pool)
740        .await?;
741        Ok(rows)
742    }
743
744    // ── Communities ───────────────────────────────────────────────────────────
745
746    /// Insert or update a community by name.
747    ///
748    /// `fingerprint` is a BLAKE3 hex string computed from sorted entity IDs and
749    /// intra-community edge IDs. Pass `None` to leave the fingerprint unchanged (e.g. when
750    /// `assign_to_community` adds an entity without a full re-detection pass).
751    ///
752    /// # Errors
753    ///
754    /// Returns an error if the database query fails or JSON serialization fails.
755    pub async fn upsert_community(
756        &self,
757        name: &str,
758        summary: &str,
759        entity_ids: &[i64],
760        fingerprint: Option<&str>,
761    ) -> Result<i64, MemoryError> {
762        let entity_ids_json = serde_json::to_string(entity_ids)?;
763        let id: i64 = zeph_db::query_scalar(sql!(
764            "INSERT INTO graph_communities (name, summary, entity_ids, fingerprint)
765             VALUES (?, ?, ?, ?)
766             ON CONFLICT(name) DO UPDATE SET
767               summary = excluded.summary,
768               entity_ids = excluded.entity_ids,
769               fingerprint = COALESCE(excluded.fingerprint, fingerprint),
770               updated_at = CURRENT_TIMESTAMP
771             RETURNING id"
772        ))
773        .bind(name)
774        .bind(summary)
775        .bind(entity_ids_json)
776        .bind(fingerprint)
777        .fetch_one(&self.pool)
778        .await?;
779        Ok(id)
780    }
781
782    /// Return a map of `fingerprint -> community_id` for all communities with a non-NULL
783    /// fingerprint. Used by `detect_communities` to skip unchanged partitions.
784    ///
785    /// # Errors
786    ///
787    /// Returns an error if the database query fails.
788    pub async fn community_fingerprints(&self) -> Result<HashMap<String, i64>, MemoryError> {
789        let rows: Vec<(String, i64)> = zeph_db::query_as(sql!(
790            "SELECT fingerprint, id FROM graph_communities WHERE fingerprint IS NOT NULL"
791        ))
792        .fetch_all(&self.pool)
793        .await?;
794        Ok(rows.into_iter().collect())
795    }
796
797    /// Delete a single community by its primary key.
798    ///
799    /// # Errors
800    ///
801    /// Returns an error if the database query fails.
802    pub async fn delete_community_by_id(&self, id: i64) -> Result<(), MemoryError> {
803        zeph_db::query(sql!("DELETE FROM graph_communities WHERE id = ?"))
804            .bind(id)
805            .execute(&self.pool)
806            .await?;
807        Ok(())
808    }
809
810    /// Set the fingerprint of a community to `NULL`, invalidating the incremental cache.
811    ///
812    /// Used by `assign_to_community` when an entity is added without a full re-detection pass,
813    /// ensuring the next `detect_communities` run re-summarizes the affected community.
814    ///
815    /// # Errors
816    ///
817    /// Returns an error if the database query fails.
818    pub async fn clear_community_fingerprint(&self, id: i64) -> Result<(), MemoryError> {
819        zeph_db::query(sql!(
820            "UPDATE graph_communities SET fingerprint = NULL WHERE id = ?"
821        ))
822        .bind(id)
823        .execute(&self.pool)
824        .await?;
825        Ok(())
826    }
827
828    /// Find the first community that contains the given `entity_id`.
829    ///
830    /// Uses `json_each()` to push the membership search into `SQLite`, avoiding a full
831    /// table scan with per-row JSON parsing.
832    ///
833    /// # Errors
834    ///
835    /// Returns an error if the database query fails or JSON parsing fails.
836    pub async fn community_for_entity(
837        &self,
838        entity_id: i64,
839    ) -> Result<Option<Community>, MemoryError> {
840        let row: Option<CommunityRow> = zeph_db::query_as(
841            sql!("SELECT c.id, c.name, c.summary, c.entity_ids, c.fingerprint, c.created_at, c.updated_at
842             FROM graph_communities c, json_each(c.entity_ids) j
843             WHERE CAST(j.value AS INTEGER) = ?
844             LIMIT 1"),
845        )
846        .bind(entity_id)
847        .fetch_optional(&self.pool)
848        .await?;
849        match row {
850            Some(row) => {
851                let entity_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
852                Ok(Some(Community {
853                    id: row.id,
854                    name: row.name,
855                    summary: row.summary,
856                    entity_ids,
857                    fingerprint: row.fingerprint,
858                    created_at: row.created_at,
859                    updated_at: row.updated_at,
860                }))
861            }
862            None => Ok(None),
863        }
864    }
865
866    /// Get all communities.
867    ///
868    /// # Errors
869    ///
870    /// Returns an error if the database query fails or JSON parsing fails.
871    pub async fn all_communities(&self) -> Result<Vec<Community>, MemoryError> {
872        let rows: Vec<CommunityRow> = zeph_db::query_as(sql!(
873            "SELECT id, name, summary, entity_ids, fingerprint, created_at, updated_at
874             FROM graph_communities
875             ORDER BY id ASC"
876        ))
877        .fetch_all(&self.pool)
878        .await?;
879
880        rows.into_iter()
881            .map(|row| {
882                let entity_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
883                Ok(Community {
884                    id: row.id,
885                    name: row.name,
886                    summary: row.summary,
887                    entity_ids,
888                    fingerprint: row.fingerprint,
889                    created_at: row.created_at,
890                    updated_at: row.updated_at,
891                })
892            })
893            .collect()
894    }
895
896    /// Count the total number of communities.
897    ///
898    /// # Errors
899    ///
900    /// Returns an error if the database query fails.
901    pub async fn community_count(&self) -> Result<i64, MemoryError> {
902        let count: i64 = zeph_db::query_scalar(sql!("SELECT COUNT(*) FROM graph_communities"))
903            .fetch_one(&self.pool)
904            .await?;
905        Ok(count)
906    }
907
908    // ── Metadata ──────────────────────────────────────────────────────────────
909
910    /// Get a metadata value by key.
911    ///
912    /// # Errors
913    ///
914    /// Returns an error if the database query fails.
915    pub async fn get_metadata(&self, key: &str) -> Result<Option<String>, MemoryError> {
916        let val: Option<String> =
917            zeph_db::query_scalar(sql!("SELECT value FROM graph_metadata WHERE key = ?"))
918                .bind(key)
919                .fetch_optional(&self.pool)
920                .await?;
921        Ok(val)
922    }
923
924    /// Set a metadata value by key (upsert).
925    ///
926    /// # Errors
927    ///
928    /// Returns an error if the database query fails.
929    pub async fn set_metadata(&self, key: &str, value: &str) -> Result<(), MemoryError> {
930        zeph_db::query(sql!(
931            "INSERT INTO graph_metadata (key, value) VALUES (?, ?)
932             ON CONFLICT(key) DO UPDATE SET value = excluded.value"
933        ))
934        .bind(key)
935        .bind(value)
936        .execute(&self.pool)
937        .await?;
938        Ok(())
939    }
940
941    /// Get the current extraction count from metadata.
942    ///
943    /// Returns 0 if the counter has not been initialized.
944    ///
945    /// # Errors
946    ///
947    /// Returns an error if the database query fails.
948    pub async fn extraction_count(&self) -> Result<i64, MemoryError> {
949        let val = self.get_metadata("extraction_count").await?;
950        Ok(val.and_then(|v| v.parse::<i64>().ok()).unwrap_or(0))
951    }
952
953    /// Stream all active (non-invalidated) edges.
954    pub fn all_active_edges_stream(&self) -> impl Stream<Item = Result<Edge, MemoryError>> + '_ {
955        use futures::StreamExt as _;
956        zeph_db::query_as::<_, EdgeRow>(sql!(
957            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
958                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
959                    edge_type, retrieval_count, last_retrieved_at, superseded_by
960             FROM graph_edges
961             WHERE valid_to IS NULL
962             ORDER BY id ASC"
963        ))
964        .fetch(&self.pool)
965        .map(|r| r.map_err(MemoryError::from).map(edge_from_row))
966    }
967
968    /// Fetch a chunk of active edges using keyset pagination.
969    ///
970    /// Returns edges with `id > after_id` in ascending order, up to `limit` rows.
971    /// Starting with `after_id = 0` returns the first chunk. Pass the last `id` from
972    /// the returned chunk as `after_id` for the next page. An empty result means all
973    /// edges have been consumed.
974    ///
975    /// Keyset pagination is O(1) per page (index seek on `id`) vs OFFSET which is O(N).
976    /// It is also stable under concurrent inserts: new edges get monotonically higher IDs
977    /// and will appear in subsequent chunks or after the last chunk, never causing
978    /// duplicates. Concurrent invalidations (setting `valid_to`) may cause a single edge
979    /// to be skipped, which is acceptable — LPA operates on an eventual-consistency snapshot.
980    ///
981    /// # Errors
982    ///
983    /// Returns an error if the database query fails.
984    pub async fn edges_after_id(
985        &self,
986        after_id: i64,
987        limit: i64,
988    ) -> Result<Vec<Edge>, MemoryError> {
989        let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
990            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
991                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
992                    edge_type, retrieval_count, last_retrieved_at, superseded_by
993             FROM graph_edges
994             WHERE valid_to IS NULL AND id > ?
995             ORDER BY id ASC
996             LIMIT ?"
997        ))
998        .bind(after_id)
999        .bind(limit)
1000        .fetch_all(&self.pool)
1001        .await?;
1002        Ok(rows.into_iter().map(edge_from_row).collect())
1003    }
1004
1005    /// Find a community by its primary key.
1006    ///
1007    /// # Errors
1008    ///
1009    /// Returns an error if the database query fails or JSON parsing fails.
1010    pub async fn find_community_by_id(&self, id: i64) -> Result<Option<Community>, MemoryError> {
1011        let row: Option<CommunityRow> = zeph_db::query_as(sql!(
1012            "SELECT id, name, summary, entity_ids, fingerprint, created_at, updated_at
1013             FROM graph_communities
1014             WHERE id = ?"
1015        ))
1016        .bind(id)
1017        .fetch_optional(&self.pool)
1018        .await?;
1019        match row {
1020            Some(row) => {
1021                let entity_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
1022                Ok(Some(Community {
1023                    id: row.id,
1024                    name: row.name,
1025                    summary: row.summary,
1026                    entity_ids,
1027                    fingerprint: row.fingerprint,
1028                    created_at: row.created_at,
1029                    updated_at: row.updated_at,
1030                }))
1031            }
1032            None => Ok(None),
1033        }
1034    }
1035
1036    /// Delete all communities (full rebuild before upsert).
1037    ///
1038    /// # Errors
1039    ///
1040    /// Returns an error if the database query fails.
1041    pub async fn delete_all_communities(&self) -> Result<(), MemoryError> {
1042        zeph_db::query(sql!("DELETE FROM graph_communities"))
1043            .execute(&self.pool)
1044            .await?;
1045        Ok(())
1046    }
1047
1048    // ── A-MEM Retrieval Tracking ──────────────────────────────────────────────
1049
1050    /// Find entities matching `query` and return them with normalized FTS5 scores.
1051    ///
1052    /// Returns `Vec<(Entity, fts_score)>` where `fts_score` is normalized to `[0.0, 1.0]`
1053    /// by dividing each negated BM25 value by the maximum in the result set.
1054    /// Alias matches receive a fixed score of `0.5` (relative to FTS matches before normalization).
1055    ///
1056    /// Uses `UNION ALL` with outer `ORDER BY` to preserve FTS5 ordering through the LIMIT.
1057    ///
1058    /// # Errors
1059    ///
1060    /// Returns an error if the database query fails.
1061    #[allow(clippy::too_many_lines)]
1062    pub async fn find_entities_ranked(
1063        &self,
1064        query: &str,
1065        limit: usize,
1066    ) -> Result<Vec<(Entity, f32)>, MemoryError> {
1067        // Row type for UNION ALL FTS5 query: (id, name, canonical_name, entity_type,
1068        // summary, first_seen_at, last_seen_at, qdrant_point_id, fts_rank).
1069        type EntityFtsRow = (
1070            i64,
1071            String,
1072            String,
1073            String,
1074            Option<String>,
1075            String,
1076            String,
1077            Option<String>,
1078            f64,
1079        );
1080
1081        const FTS5_OPERATORS: &[&str] = &["AND", "OR", "NOT", "NEAR"];
1082        let query = &query[..query.floor_char_boundary(512)];
1083        let sanitized = sanitize_fts_query(query);
1084        if sanitized.is_empty() {
1085            return Ok(vec![]);
1086        }
1087        let fts_query: String = sanitized
1088            .split_whitespace()
1089            .filter(|t| !FTS5_OPERATORS.contains(t))
1090            .map(|t| format!("{t}*"))
1091            .collect::<Vec<_>>()
1092            .join(" ");
1093        if fts_query.is_empty() {
1094            return Ok(vec![]);
1095        }
1096
1097        let limit_i64 = i64::try_from(limit)?;
1098
1099        // UNION ALL with outer ORDER BY preserves FTS5 BM25 ordering through LIMIT.
1100        // Alias matches get a fixed raw score of 0.5 (below any real BM25 match).
1101        let ranked_fts_sql = format!(
1102            "SELECT * FROM ( \
1103                 SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
1104                        e.first_seen_at, e.last_seen_at, e.qdrant_point_id, \
1105                        -bm25(graph_entities_fts, 10.0, 1.0) AS fts_rank \
1106                 FROM graph_entities_fts fts \
1107                 JOIN graph_entities e ON e.id = fts.rowid \
1108                 WHERE graph_entities_fts MATCH ? \
1109                 UNION ALL \
1110                 SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
1111                        e.first_seen_at, e.last_seen_at, e.qdrant_point_id, \
1112                        0.5 AS fts_rank \
1113                 FROM graph_entity_aliases a \
1114                 JOIN graph_entities e ON e.id = a.entity_id \
1115                 WHERE a.alias_name LIKE ? ESCAPE '\\' {} \
1116             ) \
1117             ORDER BY fts_rank DESC \
1118             LIMIT ?",
1119            <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
1120        );
1121        let rows: Vec<EntityFtsRow> = zeph_db::query_as(&ranked_fts_sql)
1122            .bind(&fts_query)
1123            .bind(format!(
1124                "%{}%",
1125                query
1126                    .trim()
1127                    .replace('\\', "\\\\")
1128                    .replace('%', "\\%")
1129                    .replace('_', "\\_")
1130            ))
1131            .bind(limit_i64)
1132            .fetch_all(&self.pool)
1133            .await?;
1134
1135        if rows.is_empty() {
1136            return Ok(vec![]);
1137        }
1138
1139        // Normalize FTS scores to [0, 1] by dividing by max; guard against div-by-zero.
1140        let max_score: f64 = rows.iter().map(|r| r.8).fold(0.0_f64, f64::max);
1141        let max_score = if max_score <= 0.0 { 1.0 } else { max_score };
1142
1143        // Deduplicate by entity ID (keep first/highest-ranked occurrence).
1144        let mut seen_ids: std::collections::HashSet<i64> = std::collections::HashSet::new();
1145        let mut result: Vec<(Entity, f32)> = Vec::with_capacity(rows.len());
1146        for (
1147            id,
1148            name,
1149            canonical_name,
1150            entity_type_str,
1151            summary,
1152            first_seen_at,
1153            last_seen_at,
1154            qdrant_point_id,
1155            raw_score,
1156        ) in rows
1157        {
1158            if !seen_ids.insert(id) {
1159                continue;
1160            }
1161            let entity_type = entity_type_str
1162                .parse()
1163                .unwrap_or(super::types::EntityType::Concept);
1164            let entity = Entity {
1165                id,
1166                name,
1167                canonical_name,
1168                entity_type,
1169                summary,
1170                first_seen_at,
1171                last_seen_at,
1172                qdrant_point_id,
1173            };
1174            #[allow(clippy::cast_possible_truncation)]
1175            let normalized = (raw_score / max_score).clamp(0.0, 1.0) as f32;
1176            result.push((entity, normalized));
1177        }
1178
1179        Ok(result)
1180    }
1181
1182    /// Compute structural scores (degree + edge type diversity) for a batch of entity IDs.
1183    ///
1184    /// Returns `HashMap<entity_id, structural_score>` where score is in `[0.0, 1.0]`.
1185    /// Formula: `0.6 * (degree / max_degree) + 0.4 * (type_diversity / 4.0)`.
1186    /// Entities with no edges receive score `0.0`.
1187    ///
1188    /// # Errors
1189    ///
1190    /// Returns an error if the database query fails.
1191    pub async fn entity_structural_scores(
1192        &self,
1193        entity_ids: &[i64],
1194    ) -> Result<HashMap<i64, f32>, MemoryError> {
1195        // Each query binds entity_ids three times (three IN clauses).
1196        // Stay safely under SQLite 999-variable limit: 999 / 3 = 333, use 163 for headroom.
1197        const MAX_BATCH: usize = 163;
1198
1199        if entity_ids.is_empty() {
1200            return Ok(HashMap::new());
1201        }
1202
1203        let mut all_rows: Vec<(i64, i64, i64)> = Vec::new();
1204        for chunk in entity_ids.chunks(MAX_BATCH) {
1205            let n = chunk.len();
1206            // Three copies of chunk IDs: positions 1..n, n+1..2n, 2n+1..3n
1207            let ph1 = placeholder_list(1, n);
1208            let ph2 = placeholder_list(n + 1, n);
1209            let ph3 = placeholder_list(n * 2 + 1, n);
1210
1211            // Build query: count degree and distinct edge types for each entity.
1212            let sql = format!(
1213                "SELECT entity_id,
1214                        COUNT(*) AS degree,
1215                        COUNT(DISTINCT edge_type) AS type_diversity
1216                 FROM (
1217                     SELECT source_entity_id AS entity_id, edge_type
1218                     FROM graph_edges
1219                     WHERE valid_to IS NULL AND source_entity_id IN ({ph1})
1220                     UNION ALL
1221                     SELECT target_entity_id AS entity_id, edge_type
1222                     FROM graph_edges
1223                     WHERE valid_to IS NULL AND target_entity_id IN ({ph2})
1224                 )
1225                 WHERE entity_id IN ({ph3})
1226                 GROUP BY entity_id"
1227            );
1228
1229            let mut query = zeph_db::query_as::<_, (i64, i64, i64)>(&sql);
1230            // Bind chunk three times (three IN clauses)
1231            for id in chunk {
1232                query = query.bind(*id);
1233            }
1234            for id in chunk {
1235                query = query.bind(*id);
1236            }
1237            for id in chunk {
1238                query = query.bind(*id);
1239            }
1240
1241            let chunk_rows: Vec<(i64, i64, i64)> = query.fetch_all(&self.pool).await?;
1242            all_rows.extend(chunk_rows);
1243        }
1244
1245        if all_rows.is_empty() {
1246            return Ok(entity_ids.iter().map(|&id| (id, 0.0_f32)).collect());
1247        }
1248
1249        let max_degree = all_rows
1250            .iter()
1251            .map(|(_, d, _)| *d)
1252            .max()
1253            .unwrap_or(1)
1254            .max(1);
1255
1256        let mut scores: HashMap<i64, f32> = entity_ids.iter().map(|&id| (id, 0.0_f32)).collect();
1257        for (entity_id, degree, type_diversity) in all_rows {
1258            #[allow(clippy::cast_precision_loss)]
1259            let norm_degree = degree as f32 / max_degree as f32;
1260            #[allow(clippy::cast_precision_loss)]
1261            let norm_diversity = (type_diversity as f32 / 4.0).min(1.0);
1262            let score = 0.6 * norm_degree + 0.4 * norm_diversity;
1263            scores.insert(entity_id, score);
1264        }
1265
1266        Ok(scores)
1267    }
1268
1269    /// Look up community IDs for a batch of entity IDs.
1270    ///
1271    /// Returns `HashMap<entity_id, community_id>`. Entities not assigned to any community
1272    /// are absent from the map (treated as `None` by callers — no community cap applied).
1273    ///
1274    /// # Errors
1275    ///
1276    /// Returns an error if the database query fails.
1277    pub async fn entity_community_ids(
1278        &self,
1279        entity_ids: &[i64],
1280    ) -> Result<HashMap<i64, i64>, MemoryError> {
1281        const MAX_BATCH: usize = 490;
1282
1283        if entity_ids.is_empty() {
1284            return Ok(HashMap::new());
1285        }
1286
1287        let mut result: HashMap<i64, i64> = HashMap::new();
1288        for chunk in entity_ids.chunks(MAX_BATCH) {
1289            let placeholders = placeholder_list(1, chunk.len());
1290
1291            let community_sql = community_ids_sql(&placeholders);
1292            let mut query = zeph_db::query_as::<_, (i64, i64)>(&community_sql);
1293            for id in chunk {
1294                query = query.bind(*id);
1295            }
1296
1297            let rows: Vec<(i64, i64)> = query.fetch_all(&self.pool).await?;
1298            result.extend(rows);
1299        }
1300
1301        Ok(result)
1302    }
1303
1304    /// Increment `retrieval_count` and set `last_retrieved_at` for a batch of edge IDs.
1305    ///
1306    /// Fire-and-forget: errors are logged but not propagated. Caller should log the warning.
1307    /// Batched with `MAX_BATCH = 490` to stay safely under `SQLite` bind variable limit.
1308    ///
1309    /// # Errors
1310    ///
1311    /// Returns an error if the database query fails.
1312    pub async fn record_edge_retrieval(&self, edge_ids: &[i64]) -> Result<(), MemoryError> {
1313        const MAX_BATCH: usize = 490;
1314        let epoch_now = <ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1315        for chunk in edge_ids.chunks(MAX_BATCH) {
1316            let edge_placeholders = placeholder_list(1, chunk.len());
1317            let retrieval_sql = format!(
1318                "UPDATE graph_edges \
1319                 SET retrieval_count = retrieval_count + 1, \
1320                     last_retrieved_at = {epoch_now} \
1321                 WHERE id IN ({edge_placeholders})"
1322            );
1323            let mut q = zeph_db::query(&retrieval_sql);
1324            for id in chunk {
1325                q = q.bind(*id);
1326            }
1327            q.execute(&self.pool).await?;
1328        }
1329        Ok(())
1330    }
1331
1332    /// Apply multiplicative decay to `retrieval_count` for un-retrieved active edges.
1333    ///
1334    /// Only edges with `retrieval_count > 0` and `last_retrieved_at < (now - interval_secs)`
1335    /// are updated. Returns the number of rows affected.
1336    ///
1337    /// # Errors
1338    ///
1339    /// Returns an error if the database query fails.
1340    pub async fn decay_edge_retrieval_counts(
1341        &self,
1342        decay_lambda: f64,
1343        interval_secs: u64,
1344    ) -> Result<usize, MemoryError> {
1345        let epoch_now_decay = <ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1346        let decay_raw = format!(
1347            "UPDATE graph_edges \
1348             SET retrieval_count = MAX(CAST(retrieval_count * ? AS INTEGER), 0) \
1349             WHERE valid_to IS NULL \
1350               AND retrieval_count > 0 \
1351               AND (last_retrieved_at IS NULL OR last_retrieved_at < {epoch_now_decay} - ?)"
1352        );
1353        let decay_sql = zeph_db::rewrite_placeholders(&decay_raw);
1354        let result = zeph_db::query(&decay_sql)
1355            .bind(decay_lambda)
1356            .bind(i64::try_from(interval_secs).unwrap_or(i64::MAX))
1357            .execute(&self.pool)
1358            .await?;
1359        Ok(usize::try_from(result.rows_affected())?)
1360    }
1361
1362    /// Delete expired edges older than `retention_days` and return count deleted.
1363    ///
1364    /// # Errors
1365    ///
1366    /// Returns an error if the database query fails.
1367    pub async fn delete_expired_edges(&self, retention_days: u32) -> Result<usize, MemoryError> {
1368        let days = i64::from(retention_days);
1369        let result = zeph_db::query(sql!(
1370            "DELETE FROM graph_edges
1371             WHERE expired_at IS NOT NULL
1372               AND expired_at < datetime('now', '-' || ? || ' days')"
1373        ))
1374        .bind(days)
1375        .execute(&self.pool)
1376        .await?;
1377        Ok(usize::try_from(result.rows_affected())?)
1378    }
1379
1380    /// Delete orphan entities (no active edges, last seen more than `retention_days` ago).
1381    ///
1382    /// # Errors
1383    ///
1384    /// Returns an error if the database query fails.
1385    pub async fn delete_orphan_entities(&self, retention_days: u32) -> Result<usize, MemoryError> {
1386        let days = i64::from(retention_days);
1387        let result = zeph_db::query(sql!(
1388            "DELETE FROM graph_entities
1389             WHERE id NOT IN (
1390                 SELECT DISTINCT source_entity_id FROM graph_edges WHERE valid_to IS NULL
1391                 UNION
1392                 SELECT DISTINCT target_entity_id FROM graph_edges WHERE valid_to IS NULL
1393             )
1394             AND last_seen_at < datetime('now', '-' || ? || ' days')"
1395        ))
1396        .bind(days)
1397        .execute(&self.pool)
1398        .await?;
1399        Ok(usize::try_from(result.rows_affected())?)
1400    }
1401
1402    /// Delete the oldest excess entities when count exceeds `max_entities`.
1403    ///
1404    /// Entities are ranked by ascending edge count, then ascending `last_seen_at` (LRU).
1405    /// Only deletes when `entity_count() > max_entities`.
1406    ///
1407    /// # Errors
1408    ///
1409    /// Returns an error if the database query fails.
1410    pub async fn cap_entities(&self, max_entities: usize) -> Result<usize, MemoryError> {
1411        let current = self.entity_count().await?;
1412        let max = i64::try_from(max_entities)?;
1413        if current <= max {
1414            return Ok(0);
1415        }
1416        let excess = current - max;
1417        let result = zeph_db::query(sql!(
1418            "DELETE FROM graph_entities
1419             WHERE id IN (
1420                 SELECT e.id
1421                 FROM graph_entities e
1422                 LEFT JOIN (
1423                     SELECT source_entity_id AS eid, COUNT(*) AS cnt
1424                     FROM graph_edges WHERE valid_to IS NULL GROUP BY source_entity_id
1425                     UNION ALL
1426                     SELECT target_entity_id AS eid, COUNT(*) AS cnt
1427                     FROM graph_edges WHERE valid_to IS NULL GROUP BY target_entity_id
1428                 ) edge_counts ON e.id = edge_counts.eid
1429                 ORDER BY COALESCE(edge_counts.cnt, 0) ASC, e.last_seen_at ASC
1430                 LIMIT ?
1431             )"
1432        ))
1433        .bind(excess)
1434        .execute(&self.pool)
1435        .await?;
1436        Ok(usize::try_from(result.rows_affected())?)
1437    }
1438
1439    // ── Temporal Edge Queries ─────────────────────────────────────────────────
1440
1441    /// Return all edges for `entity_id` (as source or target) that were valid at `timestamp`.
1442    ///
1443    /// An edge is valid at `timestamp` when:
1444    /// - `valid_from <= timestamp`, AND
1445    /// - `valid_to IS NULL` (open-ended) OR `valid_to > timestamp`.
1446    ///
1447    /// `timestamp` must be a `SQLite` datetime string: `"YYYY-MM-DD HH:MM:SS"`.
1448    ///
1449    /// # Errors
1450    ///
1451    /// Returns an error if the database query fails.
1452    pub async fn edges_at_timestamp(
1453        &self,
1454        entity_id: i64,
1455        timestamp: &str,
1456    ) -> Result<Vec<Edge>, MemoryError> {
1457        // Split into two UNIONed branches to leverage the partial indexes from migration 030:
1458        //   Branch 1 (active edges):     idx_graph_edges_valid + idx_graph_edges_source/target
1459        //   Branch 2 (historical edges): idx_graph_edges_src_temporal / idx_graph_edges_tgt_temporal
1460        let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
1461            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1462                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1463                    edge_type, retrieval_count, last_retrieved_at, superseded_by
1464             FROM graph_edges
1465             WHERE valid_to IS NULL
1466               AND valid_from <= ?
1467               AND (source_entity_id = ? OR target_entity_id = ?)
1468             UNION ALL
1469             SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1470                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1471                    edge_type, retrieval_count, last_retrieved_at, superseded_by
1472             FROM graph_edges
1473             WHERE valid_to IS NOT NULL
1474               AND valid_from <= ?
1475               AND valid_to > ?
1476               AND (source_entity_id = ? OR target_entity_id = ?)"
1477        ))
1478        .bind(timestamp)
1479        .bind(entity_id)
1480        .bind(entity_id)
1481        .bind(timestamp)
1482        .bind(timestamp)
1483        .bind(entity_id)
1484        .bind(entity_id)
1485        .fetch_all(&self.pool)
1486        .await?;
1487        Ok(rows.into_iter().map(edge_from_row).collect())
1488    }
1489
1490    /// Return all edge versions (active and expired) for the given `(source, predicate)` pair.
1491    ///
1492    /// The optional `relation` filter restricts results to a specific relation label.
1493    /// Results are ordered by `valid_from DESC` (most recent first).
1494    ///
1495    /// # Errors
1496    ///
1497    /// Returns an error if the database query fails.
1498    pub async fn edge_history(
1499        &self,
1500        source_entity_id: i64,
1501        predicate: &str,
1502        relation: Option<&str>,
1503        limit: usize,
1504    ) -> Result<Vec<Edge>, MemoryError> {
1505        // Escape LIKE wildcards so `%` and `_` in the predicate are treated as literals.
1506        let escaped = predicate
1507            .replace('\\', "\\\\")
1508            .replace('%', "\\%")
1509            .replace('_', "\\_");
1510        let like_pattern = format!("%{escaped}%");
1511        let limit = i64::try_from(limit)?;
1512        let rows: Vec<EdgeRow> = if let Some(rel) = relation {
1513            zeph_db::query_as(sql!(
1514                "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1515                        valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1516                        edge_type, retrieval_count, last_retrieved_at, superseded_by, superseded_by
1517                 FROM graph_edges
1518                 WHERE source_entity_id = ?
1519                   AND fact LIKE ? ESCAPE '\\'
1520                   AND relation = ?
1521                 ORDER BY valid_from DESC
1522                 LIMIT ?"
1523            ))
1524            .bind(source_entity_id)
1525            .bind(&like_pattern)
1526            .bind(rel)
1527            .bind(limit)
1528            .fetch_all(&self.pool)
1529            .await?
1530        } else {
1531            zeph_db::query_as(sql!(
1532                "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1533                        valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1534                        edge_type, retrieval_count, last_retrieved_at, superseded_by, superseded_by
1535                 FROM graph_edges
1536                 WHERE source_entity_id = ?
1537                   AND fact LIKE ? ESCAPE '\\'
1538                 ORDER BY valid_from DESC
1539                 LIMIT ?"
1540            ))
1541            .bind(source_entity_id)
1542            .bind(&like_pattern)
1543            .bind(limit)
1544            .fetch_all(&self.pool)
1545            .await?
1546        };
1547        Ok(rows.into_iter().map(edge_from_row).collect())
1548    }
1549
1550    // ── BFS Traversal ─────────────────────────────────────────────────────────
1551
1552    /// Breadth-first traversal from `start_entity_id` up to `max_hops` hops.
1553    ///
1554    /// Returns all reachable entities and the active edges connecting them.
1555    /// Implements BFS iteratively in Rust to guarantee cycle safety regardless
1556    /// of `SQLite` CTE limitations.
1557    ///
1558    /// **`SQLite` bind parameter limit**: each BFS hop binds the frontier IDs three times in the
1559    /// neighbour query. At ~300+ frontier entities per hop, the IN clause may approach `SQLite`'s
1560    /// default `SQLITE_MAX_VARIABLE_NUMBER` limit of 999. Acceptable for Phase 1 (small graphs,
1561    /// `max_hops` typically 2–3). For large graphs, consider batching or a temp-table approach.
1562    ///
1563    /// # Errors
1564    ///
1565    /// Returns an error if any database query fails.
1566    pub async fn bfs(
1567        &self,
1568        start_entity_id: i64,
1569        max_hops: u32,
1570    ) -> Result<(Vec<Entity>, Vec<Edge>), MemoryError> {
1571        self.bfs_with_depth(start_entity_id, max_hops)
1572            .await
1573            .map(|(e, ed, _)| (e, ed))
1574    }
1575
1576    /// BFS traversal returning entities, edges, and a depth map (`entity_id` → hop distance).
1577    ///
1578    /// The depth map records the minimum hop distance from `start_entity_id` to each visited
1579    /// entity. The start entity itself has depth 0.
1580    ///
1581    /// **`SQLite` bind parameter limit**: see [`bfs`] for notes on frontier size limits.
1582    ///
1583    /// # Errors
1584    ///
1585    /// Returns an error if any database query fails.
1586    pub async fn bfs_with_depth(
1587        &self,
1588        start_entity_id: i64,
1589        max_hops: u32,
1590    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1591        self.bfs_core(start_entity_id, max_hops, None).await
1592    }
1593
1594    /// BFS traversal considering only edges that were valid at `timestamp`.
1595    ///
1596    /// Equivalent to [`bfs_with_depth`] but replaces the `valid_to IS NULL` filter with
1597    /// the temporal range predicate `valid_from <= ts AND (valid_to IS NULL OR valid_to > ts)`.
1598    ///
1599    /// `timestamp` must be a `SQLite` datetime string: `"YYYY-MM-DD HH:MM:SS"`.
1600    ///
1601    /// # Errors
1602    ///
1603    /// Returns an error if any database query fails.
1604    pub async fn bfs_at_timestamp(
1605        &self,
1606        start_entity_id: i64,
1607        max_hops: u32,
1608        timestamp: &str,
1609    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1610        self.bfs_core(start_entity_id, max_hops, Some(timestamp))
1611            .await
1612    }
1613
1614    /// BFS traversal scoped to specific MAGMA edge types.
1615    ///
1616    /// When `edge_types` is empty, behaves identically to [`bfs_with_depth`] (traverses all
1617    /// active edges). When `edge_types` is non-empty, only traverses edges whose `edge_type`
1618    /// matches one of the provided types.
1619    ///
1620    /// This enables subgraph-scoped retrieval: a causal query traverses only causal + semantic
1621    /// edges, a temporal query only temporal + semantic edges, etc.
1622    ///
1623    /// Note: Semantic is typically included in `edge_types` by the caller to ensure recall is
1624    /// never worse than the untyped BFS. See `classify_graph_subgraph` in `router.rs`.
1625    ///
1626    /// # Errors
1627    ///
1628    /// Returns an error if any database query fails.
1629    pub async fn bfs_typed(
1630        &self,
1631        start_entity_id: i64,
1632        max_hops: u32,
1633        edge_types: &[EdgeType],
1634    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1635        if edge_types.is_empty() {
1636            return self.bfs_with_depth(start_entity_id, max_hops).await;
1637        }
1638        self.bfs_core_typed(start_entity_id, max_hops, None, edge_types)
1639            .await
1640    }
1641
1642    /// Shared BFS implementation.
1643    ///
1644    /// When `at_timestamp` is `None`, only active edges (`valid_to IS NULL`) are traversed.
1645    /// When `at_timestamp` is `Some(ts)`, edges valid at `ts` are traversed (temporal BFS).
1646    ///
1647    /// All IDs used in dynamic SQL come from our own database — no user input reaches the
1648    /// format string, so there is no SQL injection risk.
1649    async fn bfs_core(
1650        &self,
1651        start_entity_id: i64,
1652        max_hops: u32,
1653        at_timestamp: Option<&str>,
1654    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1655        use std::collections::HashMap;
1656
1657        // SQLite binds frontier IDs 3× per hop; at >333 IDs the IN clause exceeds
1658        // SQLITE_MAX_VARIABLE_NUMBER (999). Cap to 300 to stay safely within the limit.
1659        const MAX_FRONTIER: usize = 300;
1660
1661        let mut depth_map: HashMap<i64, u32> = HashMap::new();
1662        let mut frontier: Vec<i64> = vec![start_entity_id];
1663        depth_map.insert(start_entity_id, 0);
1664
1665        for hop in 0..max_hops {
1666            if frontier.is_empty() {
1667                break;
1668            }
1669            frontier.truncate(MAX_FRONTIER);
1670            // IDs come from our own DB — no user input, no injection risk.
1671            // Three copies of frontier IDs: positions 1..n, n+1..2n, 2n+1..3n.
1672            // Timestamp (if any) follows at position 3n+1.
1673            let n = frontier.len();
1674            let ph1 = placeholder_list(1, n);
1675            let ph2 = placeholder_list(n + 1, n);
1676            let ph3 = placeholder_list(n * 2 + 1, n);
1677            let edge_filter = if at_timestamp.is_some() {
1678                let ts_pos = n * 3 + 1;
1679                format!(
1680                    "valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1681                    ts = numbered_placeholder(ts_pos),
1682                )
1683            } else {
1684                "valid_to IS NULL".to_owned()
1685            };
1686            let neighbour_sql = format!(
1687                "SELECT DISTINCT CASE
1688                     WHEN source_entity_id IN ({ph1}) THEN target_entity_id
1689                     ELSE source_entity_id
1690                 END as neighbour_id
1691                 FROM graph_edges
1692                 WHERE {edge_filter}
1693                   AND (source_entity_id IN ({ph2}) OR target_entity_id IN ({ph3}))"
1694            );
1695            let mut q = zeph_db::query_scalar::<_, i64>(&neighbour_sql);
1696            for id in &frontier {
1697                q = q.bind(*id);
1698            }
1699            for id in &frontier {
1700                q = q.bind(*id);
1701            }
1702            for id in &frontier {
1703                q = q.bind(*id);
1704            }
1705            if let Some(ts) = at_timestamp {
1706                q = q.bind(ts);
1707            }
1708            let neighbours: Vec<i64> = q.fetch_all(&self.pool).await?;
1709            let mut next_frontier: Vec<i64> = Vec::new();
1710            for nbr in neighbours {
1711                if let std::collections::hash_map::Entry::Vacant(e) = depth_map.entry(nbr) {
1712                    e.insert(hop + 1);
1713                    next_frontier.push(nbr);
1714                }
1715            }
1716            frontier = next_frontier;
1717        }
1718
1719        self.bfs_fetch_results(depth_map, at_timestamp).await
1720    }
1721
1722    /// BFS implementation scoped to specific edge types.
1723    ///
1724    /// Builds the IN clause for `edge_type` filtering dynamically from enum values.
1725    /// All enum-derived strings come from `EdgeType::as_str()` — no user input reaches SQL.
1726    ///
1727    /// # Errors
1728    ///
1729    /// Returns an error if any database query fails.
1730    async fn bfs_core_typed(
1731        &self,
1732        start_entity_id: i64,
1733        max_hops: u32,
1734        at_timestamp: Option<&str>,
1735        edge_types: &[EdgeType],
1736    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1737        use std::collections::HashMap;
1738
1739        const MAX_FRONTIER: usize = 300;
1740
1741        let type_strs: Vec<&str> = edge_types.iter().map(|t| t.as_str()).collect();
1742
1743        let mut depth_map: HashMap<i64, u32> = HashMap::new();
1744        let mut frontier: Vec<i64> = vec![start_entity_id];
1745        depth_map.insert(start_entity_id, 0);
1746
1747        let n_types = type_strs.len();
1748        // type_in is constant for the entire BFS — positions 1..=n_types never change.
1749        let type_in = placeholder_list(1, n_types);
1750        let id_start = n_types + 1;
1751
1752        for hop in 0..max_hops {
1753            if frontier.is_empty() {
1754                break;
1755            }
1756            frontier.truncate(MAX_FRONTIER);
1757
1758            let n_frontier = frontier.len();
1759            // Positions: types first (1..n_types), then 3 copies of frontier IDs.
1760            let fp1 = placeholder_list(id_start, n_frontier);
1761            let fp2 = placeholder_list(id_start + n_frontier, n_frontier);
1762            let fp3 = placeholder_list(id_start + n_frontier * 2, n_frontier);
1763
1764            let edge_filter = if at_timestamp.is_some() {
1765                let ts_pos = id_start + n_frontier * 3;
1766                format!(
1767                    "edge_type IN ({type_in}) AND valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1768                    ts = numbered_placeholder(ts_pos),
1769                )
1770            } else {
1771                format!("edge_type IN ({type_in}) AND valid_to IS NULL")
1772            };
1773
1774            let neighbour_sql = format!(
1775                "SELECT DISTINCT CASE
1776                     WHEN source_entity_id IN ({fp1}) THEN target_entity_id
1777                     ELSE source_entity_id
1778                 END as neighbour_id
1779                 FROM graph_edges
1780                 WHERE {edge_filter}
1781                   AND (source_entity_id IN ({fp2}) OR target_entity_id IN ({fp3}))"
1782            );
1783
1784            let mut q = zeph_db::query_scalar::<_, i64>(&neighbour_sql);
1785            // Bind types first
1786            for t in &type_strs {
1787                q = q.bind(*t);
1788            }
1789            // Bind frontier 3 times
1790            for id in &frontier {
1791                q = q.bind(*id);
1792            }
1793            for id in &frontier {
1794                q = q.bind(*id);
1795            }
1796            for id in &frontier {
1797                q = q.bind(*id);
1798            }
1799            if let Some(ts) = at_timestamp {
1800                q = q.bind(ts);
1801            }
1802
1803            let neighbours: Vec<i64> = q.fetch_all(&self.pool).await?;
1804            let mut next_frontier: Vec<i64> = Vec::new();
1805            for nbr in neighbours {
1806                if let std::collections::hash_map::Entry::Vacant(e) = depth_map.entry(nbr) {
1807                    e.insert(hop + 1);
1808                    next_frontier.push(nbr);
1809                }
1810            }
1811            frontier = next_frontier;
1812        }
1813
1814        // Fetch results — pass edge_type filter to bfs_fetch_results_typed
1815        self.bfs_fetch_results_typed(depth_map, at_timestamp, &type_strs)
1816            .await
1817    }
1818
1819    /// Fetch entities and typed edges for a completed BFS depth map.
1820    ///
1821    /// Filters returned edges by the provided `edge_type` strings.
1822    ///
1823    /// # Errors
1824    ///
1825    /// Returns an error if any database query fails.
1826    async fn bfs_fetch_results_typed(
1827        &self,
1828        depth_map: std::collections::HashMap<i64, u32>,
1829        at_timestamp: Option<&str>,
1830        type_strs: &[&str],
1831    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1832        let mut visited_ids: Vec<i64> = depth_map.keys().copied().collect();
1833        if visited_ids.is_empty() {
1834            return Ok((Vec::new(), Vec::new(), depth_map));
1835        }
1836        if visited_ids.len() > 499 {
1837            tracing::warn!(
1838                total = visited_ids.len(),
1839                retained = 499,
1840                "bfs_fetch_results_typed: visited entity set truncated to 499"
1841            );
1842            visited_ids.truncate(499);
1843        }
1844
1845        let n_types = type_strs.len();
1846        let n_visited = visited_ids.len();
1847
1848        // Bind order: types first (1..=n_types), then visited_ids twice, then optional timestamp.
1849        let type_in = placeholder_list(1, n_types);
1850        let id_start = n_types + 1;
1851        let ph_ids1 = placeholder_list(id_start, n_visited);
1852        let ph_ids2 = placeholder_list(id_start + n_visited, n_visited);
1853
1854        let edge_filter = if at_timestamp.is_some() {
1855            let ts_pos = id_start + n_visited * 2;
1856            format!(
1857                "edge_type IN ({type_in}) AND valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1858                ts = numbered_placeholder(ts_pos),
1859            )
1860        } else {
1861            format!("edge_type IN ({type_in}) AND valid_to IS NULL")
1862        };
1863
1864        let edge_sql = format!(
1865            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1866                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1867                    edge_type, retrieval_count, last_retrieved_at, superseded_by
1868             FROM graph_edges
1869             WHERE {edge_filter}
1870               AND source_entity_id IN ({ph_ids1})
1871               AND target_entity_id IN ({ph_ids2})"
1872        );
1873        let mut edge_query = zeph_db::query_as::<_, EdgeRow>(&edge_sql);
1874        for t in type_strs {
1875            edge_query = edge_query.bind(*t);
1876        }
1877        for id in &visited_ids {
1878            edge_query = edge_query.bind(*id);
1879        }
1880        for id in &visited_ids {
1881            edge_query = edge_query.bind(*id);
1882        }
1883        if let Some(ts) = at_timestamp {
1884            edge_query = edge_query.bind(ts);
1885        }
1886        let edge_rows: Vec<EdgeRow> = edge_query.fetch_all(&self.pool).await?;
1887
1888        // For entity query, use plain sequential bind positions (no type prefix offset)
1889        let entity_sql2 = format!(
1890            "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
1891             FROM graph_entities WHERE id IN ({ph})",
1892            ph = placeholder_list(1, visited_ids.len()),
1893        );
1894        let mut entity_query = zeph_db::query_as::<_, EntityRow>(&entity_sql2);
1895        for id in &visited_ids {
1896            entity_query = entity_query.bind(*id);
1897        }
1898        let entity_rows: Vec<EntityRow> = entity_query.fetch_all(&self.pool).await?;
1899
1900        let entities: Vec<Entity> = entity_rows
1901            .into_iter()
1902            .map(entity_from_row)
1903            .collect::<Result<Vec<_>, _>>()?;
1904        let edges: Vec<Edge> = edge_rows.into_iter().map(edge_from_row).collect();
1905
1906        Ok((entities, edges, depth_map))
1907    }
1908
1909    /// Fetch entities and edges for a completed BFS depth map.
1910    async fn bfs_fetch_results(
1911        &self,
1912        depth_map: std::collections::HashMap<i64, u32>,
1913        at_timestamp: Option<&str>,
1914    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1915        let mut visited_ids: Vec<i64> = depth_map.keys().copied().collect();
1916        if visited_ids.is_empty() {
1917            return Ok((Vec::new(), Vec::new(), depth_map));
1918        }
1919        // Edge query binds visited_ids twice — cap at 499 to stay under SQLite 999 limit.
1920        if visited_ids.len() > 499 {
1921            tracing::warn!(
1922                total = visited_ids.len(),
1923                retained = 499,
1924                "bfs_fetch_results: visited entity set truncated to 499 to stay within SQLite bind limit; \
1925                 some reachable entities will be dropped from results"
1926            );
1927            visited_ids.truncate(499);
1928        }
1929
1930        let n = visited_ids.len();
1931        let ph_ids1 = placeholder_list(1, n);
1932        let ph_ids2 = placeholder_list(n + 1, n);
1933        let edge_filter = if at_timestamp.is_some() {
1934            let ts_pos = n * 2 + 1;
1935            format!(
1936                "valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1937                ts = numbered_placeholder(ts_pos),
1938            )
1939        } else {
1940            "valid_to IS NULL".to_owned()
1941        };
1942        let edge_sql = format!(
1943            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1944                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1945                    edge_type, retrieval_count, last_retrieved_at, superseded_by
1946             FROM graph_edges
1947             WHERE {edge_filter}
1948               AND source_entity_id IN ({ph_ids1})
1949               AND target_entity_id IN ({ph_ids2})"
1950        );
1951        let mut edge_query = zeph_db::query_as::<_, EdgeRow>(&edge_sql);
1952        for id in &visited_ids {
1953            edge_query = edge_query.bind(*id);
1954        }
1955        for id in &visited_ids {
1956            edge_query = edge_query.bind(*id);
1957        }
1958        if let Some(ts) = at_timestamp {
1959            edge_query = edge_query.bind(ts);
1960        }
1961        let edge_rows: Vec<EdgeRow> = edge_query.fetch_all(&self.pool).await?;
1962
1963        let entity_sql = format!(
1964            "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
1965             FROM graph_entities WHERE id IN ({ph})",
1966            ph = placeholder_list(1, n),
1967        );
1968        let mut entity_query = zeph_db::query_as::<_, EntityRow>(&entity_sql);
1969        for id in &visited_ids {
1970            entity_query = entity_query.bind(*id);
1971        }
1972        let entity_rows: Vec<EntityRow> = entity_query.fetch_all(&self.pool).await?;
1973
1974        let entities: Vec<Entity> = entity_rows
1975            .into_iter()
1976            .map(entity_from_row)
1977            .collect::<Result<Vec<_>, _>>()?;
1978        let edges: Vec<Edge> = edge_rows.into_iter().map(edge_from_row).collect();
1979
1980        Ok((entities, edges, depth_map))
1981    }
1982
1983    // ── Backfill helpers ──────────────────────────────────────────────────────
1984
1985    /// Find an entity by name only (no type filter).
1986    ///
1987    /// Uses a two-phase lookup to ensure exact name matches are always prioritised:
1988    /// 1. Exact case-insensitive match on `name` or `canonical_name`.
1989    /// 2. If no exact match found, falls back to FTS5 prefix search (see `find_entities_fuzzy`).
1990    ///
1991    /// This prevents FTS5 from returning a different entity whose *summary* mentions the
1992    /// searched name (e.g. searching "Alice" returning "Google" because Google's summary
1993    /// contains "Alice").
1994    ///
1995    /// # Errors
1996    ///
1997    /// Returns an error if the database query fails.
1998    pub async fn find_entity_by_name(&self, name: &str) -> Result<Vec<Entity>, MemoryError> {
1999        let find_by_name_sql = format!(
2000            "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id \
2001             FROM graph_entities \
2002             WHERE name = ? {cn} OR canonical_name = ? {cn} \
2003             LIMIT 5",
2004            cn = <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
2005        );
2006        let rows: Vec<EntityRow> = zeph_db::query_as(&find_by_name_sql)
2007            .bind(name)
2008            .bind(name)
2009            .fetch_all(&self.pool)
2010            .await?;
2011
2012        if !rows.is_empty() {
2013            return rows.into_iter().map(entity_from_row).collect();
2014        }
2015
2016        self.find_entities_fuzzy(name, 5).await
2017    }
2018
2019    /// Return up to `limit` messages that have not yet been processed by graph extraction.
2020    ///
2021    /// Reads the `graph_processed` column added by migration 021.
2022    ///
2023    /// # Errors
2024    ///
2025    /// Returns an error if the database query fails.
2026    pub async fn unprocessed_messages_for_backfill(
2027        &self,
2028        limit: usize,
2029    ) -> Result<Vec<(crate::types::MessageId, String)>, MemoryError> {
2030        let limit = i64::try_from(limit)?;
2031        let rows: Vec<(i64, String)> = zeph_db::query_as(sql!(
2032            "SELECT id, content FROM messages
2033             WHERE graph_processed = 0
2034             ORDER BY id ASC
2035             LIMIT ?"
2036        ))
2037        .bind(limit)
2038        .fetch_all(&self.pool)
2039        .await?;
2040        Ok(rows
2041            .into_iter()
2042            .map(|(id, content)| (crate::types::MessageId(id), content))
2043            .collect())
2044    }
2045
2046    /// Return the count of messages not yet processed by graph extraction.
2047    ///
2048    /// # Errors
2049    ///
2050    /// Returns an error if the database query fails.
2051    pub async fn unprocessed_message_count(&self) -> Result<i64, MemoryError> {
2052        let count: i64 = zeph_db::query_scalar(sql!(
2053            "SELECT COUNT(*) FROM messages WHERE graph_processed = 0"
2054        ))
2055        .fetch_one(&self.pool)
2056        .await?;
2057        Ok(count)
2058    }
2059
2060    /// Mark a batch of messages as graph-processed.
2061    ///
2062    /// # Errors
2063    ///
2064    /// Returns an error if the database query fails.
2065    pub async fn mark_messages_graph_processed(
2066        &self,
2067        ids: &[crate::types::MessageId],
2068    ) -> Result<(), MemoryError> {
2069        const MAX_BATCH: usize = 490;
2070        if ids.is_empty() {
2071            return Ok(());
2072        }
2073        for chunk in ids.chunks(MAX_BATCH) {
2074            let placeholders = placeholder_list(1, chunk.len());
2075            let sql =
2076                format!("UPDATE messages SET graph_processed = 1 WHERE id IN ({placeholders})");
2077            let mut query = zeph_db::query(&sql);
2078            for id in chunk {
2079                query = query.bind(id.0);
2080            }
2081            query.execute(&self.pool).await?;
2082        }
2083        Ok(())
2084    }
2085}
2086
2087// ── Dialect helpers ───────────────────────────────────────────────────────────
2088
2089#[cfg(feature = "sqlite")]
2090fn community_ids_sql(placeholders: &str) -> String {
2091    format!(
2092        "SELECT CAST(j.value AS INTEGER) AS entity_id, c.id AS community_id
2093         FROM graph_communities c, json_each(c.entity_ids) j
2094         WHERE CAST(j.value AS INTEGER) IN ({placeholders})"
2095    )
2096}
2097
2098#[cfg(feature = "postgres")]
2099fn community_ids_sql(placeholders: &str) -> String {
2100    format!(
2101        "SELECT (j.value)::bigint AS entity_id, c.id AS community_id
2102         FROM graph_communities c,
2103              jsonb_array_elements_text(c.entity_ids::jsonb) j(value)
2104         WHERE (j.value)::bigint IN ({placeholders})"
2105    )
2106}
2107
2108// ── Row types for zeph_db::query_as ─────────────────────────────────────────────
2109
2110#[derive(zeph_db::FromRow)]
2111struct EntityRow {
2112    id: i64,
2113    name: String,
2114    canonical_name: String,
2115    entity_type: String,
2116    summary: Option<String>,
2117    first_seen_at: String,
2118    last_seen_at: String,
2119    qdrant_point_id: Option<String>,
2120}
2121
2122fn entity_from_row(row: EntityRow) -> Result<Entity, MemoryError> {
2123    let entity_type = row
2124        .entity_type
2125        .parse::<EntityType>()
2126        .map_err(MemoryError::GraphStore)?;
2127    Ok(Entity {
2128        id: row.id,
2129        name: row.name,
2130        canonical_name: row.canonical_name,
2131        entity_type,
2132        summary: row.summary,
2133        first_seen_at: row.first_seen_at,
2134        last_seen_at: row.last_seen_at,
2135        qdrant_point_id: row.qdrant_point_id,
2136    })
2137}
2138
2139#[derive(zeph_db::FromRow)]
2140struct AliasRow {
2141    id: i64,
2142    entity_id: i64,
2143    alias_name: String,
2144    created_at: String,
2145}
2146
2147fn alias_from_row(row: AliasRow) -> EntityAlias {
2148    EntityAlias {
2149        id: row.id,
2150        entity_id: row.entity_id,
2151        alias_name: row.alias_name,
2152        created_at: row.created_at,
2153    }
2154}
2155
2156#[derive(zeph_db::FromRow)]
2157struct EdgeRow {
2158    id: i64,
2159    source_entity_id: i64,
2160    target_entity_id: i64,
2161    relation: String,
2162    fact: String,
2163    confidence: f64,
2164    valid_from: String,
2165    valid_to: Option<String>,
2166    created_at: String,
2167    expired_at: Option<String>,
2168    #[sqlx(rename = "episode_id")]
2169    source_message_id: Option<i64>,
2170    qdrant_point_id: Option<String>,
2171    edge_type: String,
2172    retrieval_count: i32,
2173    last_retrieved_at: Option<i64>,
2174    superseded_by: Option<i64>,
2175}
2176
2177fn edge_from_row(row: EdgeRow) -> Edge {
2178    let edge_type = row
2179        .edge_type
2180        .parse::<EdgeType>()
2181        .unwrap_or(EdgeType::Semantic);
2182    Edge {
2183        id: row.id,
2184        source_entity_id: row.source_entity_id,
2185        target_entity_id: row.target_entity_id,
2186        relation: row.relation,
2187        fact: row.fact,
2188        #[allow(clippy::cast_possible_truncation)]
2189        confidence: row.confidence as f32,
2190        valid_from: row.valid_from,
2191        valid_to: row.valid_to,
2192        created_at: row.created_at,
2193        expired_at: row.expired_at,
2194        source_message_id: row.source_message_id.map(MessageId),
2195        qdrant_point_id: row.qdrant_point_id,
2196        edge_type,
2197        retrieval_count: row.retrieval_count,
2198        last_retrieved_at: row.last_retrieved_at,
2199        superseded_by: row.superseded_by,
2200    }
2201}
2202
2203#[derive(zeph_db::FromRow)]
2204struct CommunityRow {
2205    id: i64,
2206    name: String,
2207    summary: String,
2208    entity_ids: String,
2209    fingerprint: Option<String>,
2210    created_at: String,
2211    updated_at: String,
2212}
2213
2214// ── GAAMA Episode methods ──────────────────────────────────────────────────────
2215
2216impl GraphStore {
2217    /// Ensure a GAAMA episode exists for this conversation, returning its ID.
2218    ///
2219    /// Idempotent: inserts on first call, returns existing ID on subsequent calls.
2220    ///
2221    /// # Errors
2222    ///
2223    /// Returns an error if the database query fails.
2224    pub async fn ensure_episode(&self, conversation_id: i64) -> Result<i64, MemoryError> {
2225        // Ensure the conversation row exists before inserting into graph_episodes,
2226        // which has a FK referencing conversations(id). On a fresh database the agent
2227        // may run graph extraction before the conversation row is committed.
2228        zeph_db::query(sql!("INSERT OR IGNORE INTO conversations (id) VALUES (?)"))
2229            .bind(conversation_id)
2230            .execute(&self.pool)
2231            .await?;
2232
2233        let id: i64 = zeph_db::query_scalar(sql!(
2234            "INSERT INTO graph_episodes (conversation_id)
2235             VALUES (?)
2236             ON CONFLICT(conversation_id) DO UPDATE SET conversation_id = excluded.conversation_id
2237             RETURNING id"
2238        ))
2239        .bind(conversation_id)
2240        .fetch_one(&self.pool)
2241        .await?;
2242        Ok(id)
2243    }
2244
2245    /// Record that an entity was observed in an episode.
2246    ///
2247    /// Idempotent: does nothing if the link already exists.
2248    ///
2249    /// # Errors
2250    ///
2251    /// Returns an error if the database query fails.
2252    pub async fn link_entity_to_episode(
2253        &self,
2254        episode_id: i64,
2255        entity_id: i64,
2256    ) -> Result<(), MemoryError> {
2257        zeph_db::query(sql!(
2258            "INSERT OR IGNORE INTO graph_episode_entities (episode_id, entity_id)
2259             VALUES (?, ?)"
2260        ))
2261        .bind(episode_id)
2262        .bind(entity_id)
2263        .execute(&self.pool)
2264        .await?;
2265        Ok(())
2266    }
2267
2268    /// Return all episodes in which an entity appears.
2269    ///
2270    /// # Errors
2271    ///
2272    /// Returns an error if the database query fails.
2273    pub async fn episodes_for_entity(
2274        &self,
2275        entity_id: i64,
2276    ) -> Result<Vec<super::types::Episode>, MemoryError> {
2277        #[derive(zeph_db::FromRow)]
2278        struct EpisodeRow {
2279            id: i64,
2280            conversation_id: i64,
2281            created_at: String,
2282            closed_at: Option<String>,
2283        }
2284        let rows: Vec<EpisodeRow> = zeph_db::query_as(sql!(
2285            "SELECT e.id, e.conversation_id, e.created_at, e.closed_at
2286             FROM graph_episodes e
2287             JOIN graph_episode_entities ee ON ee.episode_id = e.id
2288             WHERE ee.entity_id = ?"
2289        ))
2290        .bind(entity_id)
2291        .fetch_all(&self.pool)
2292        .await?;
2293        Ok(rows
2294            .into_iter()
2295            .map(|r| super::types::Episode {
2296                id: r.id,
2297                conversation_id: r.conversation_id,
2298                created_at: r.created_at,
2299                closed_at: r.closed_at,
2300            })
2301            .collect())
2302    }
2303}
2304
2305// ── Tests ─────────────────────────────────────────────────────────────────────
2306
2307#[cfg(test)]
2308mod tests;