Skip to main content

zeph_memory/graph/store/
mod.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! SQLite-backed knowledge graph store.
5//!
6//! [`GraphStore`] manages the `graph_entities`, `graph_edges`, `graph_entity_aliases`,
7//! `graph_communities`, and `graph_episodes` tables. It is the persistence layer for
8//! the MAGMA / GAAMA graph subsystem.
9
10use std::collections::HashMap;
11#[allow(unused_imports)]
12use zeph_db::sql;
13
14use futures::Stream;
15use zeph_db::fts::sanitize_fts_query;
16use zeph_db::{ActiveDialect, DbPool, numbered_placeholder, placeholder_list};
17
18use crate::error::MemoryError;
19use crate::graph::conflict::{ApexMetrics, SUPERSEDE_DEPTH_CAP};
20use crate::types::{EntityId, MessageId};
21
22use super::types::{Community, Edge, EdgeType, Entity, EntityAlias, EntityType};
23
24/// SQLite-backed persistence layer for the knowledge graph.
25///
26/// All graph mutations go through this type: entity upserts, edge creation,
27/// alias recording, community assignment, and episode management.
28///
29/// Obtain an instance via [`GraphStore::new`] after running the `zeph-db` migrations.
30pub struct GraphStore {
31    pool: DbPool,
32}
33
34impl GraphStore {
35    /// Create a new `GraphStore` wrapping `pool`.
36    ///
37    /// The pool must come from a database with the `zeph-db` migrations applied.
38    #[must_use]
39    pub fn new(pool: DbPool) -> Self {
40        Self { pool }
41    }
42
43    /// Access the underlying database pool.
44    #[must_use]
45    pub fn pool(&self) -> &DbPool {
46        &self.pool
47    }
48
49    // ── Entities ─────────────────────────────────────────────────────────────
50
51    /// Insert or update an entity by `(canonical_name, entity_type)`.
52    ///
53    /// - `surface_name`: the original display form (e.g. `"Rust"`) — stored in the `name` column
54    ///   so user-facing output preserves casing. Updated on every upsert to the latest seen form.
55    /// - `canonical_name`: the stable normalized key (e.g. `"rust"`) — used for deduplication.
56    /// - `summary`: pass `None` to preserve the existing summary; pass `Some("")` to blank it.
57    ///
58    /// # Errors
59    ///
60    /// Returns an error if the database query fails.
61    pub async fn upsert_entity(
62        &self,
63        surface_name: &str,
64        canonical_name: &str,
65        entity_type: EntityType,
66        summary: Option<&str>,
67    ) -> Result<EntityId, MemoryError> {
68        let type_str = entity_type.as_str();
69        let id: i64 = zeph_db::query_scalar(sql!(
70            "INSERT INTO graph_entities (name, canonical_name, entity_type, summary)
71             VALUES (?, ?, ?, ?)
72             ON CONFLICT(canonical_name, entity_type) DO UPDATE SET
73               name = excluded.name,
74               summary = COALESCE(excluded.summary, summary),
75               last_seen_at = CURRENT_TIMESTAMP
76             RETURNING id"
77        ))
78        .bind(surface_name)
79        .bind(canonical_name)
80        .bind(type_str)
81        .bind(summary)
82        .fetch_one(&self.pool)
83        .await?;
84        Ok(EntityId(id))
85    }
86
87    /// Find an entity by exact canonical name and type.
88    ///
89    /// # Errors
90    ///
91    /// Returns an error if the database query fails.
92    pub async fn find_entity(
93        &self,
94        canonical_name: &str,
95        entity_type: EntityType,
96    ) -> Result<Option<Entity>, MemoryError> {
97        let type_str = entity_type.as_str();
98        let row: Option<EntityRow> = zeph_db::query_as(
99            sql!("SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
100             FROM graph_entities
101             WHERE canonical_name = ? AND entity_type = ?"),
102        )
103        .bind(canonical_name)
104        .bind(type_str)
105        .fetch_optional(&self.pool)
106        .await?;
107        row.map(entity_from_row).transpose()
108    }
109
110    /// Find an entity by its numeric ID.
111    ///
112    /// # Errors
113    ///
114    /// Returns an error if the database query fails.
115    pub async fn find_entity_by_id(&self, entity_id: i64) -> Result<Option<Entity>, MemoryError> {
116        let row: Option<EntityRow> = zeph_db::query_as(
117            sql!("SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
118             FROM graph_entities
119             WHERE id = ?"),
120        )
121        .bind(entity_id)
122        .fetch_optional(&self.pool)
123        .await?;
124        row.map(entity_from_row).transpose()
125    }
126
127    /// Update the `qdrant_point_id` for an entity.
128    ///
129    /// # Errors
130    ///
131    /// Returns an error if the database query fails.
132    pub async fn set_entity_qdrant_point_id(
133        &self,
134        entity_id: i64,
135        point_id: &str,
136    ) -> Result<(), MemoryError> {
137        zeph_db::query(sql!(
138            "UPDATE graph_entities SET qdrant_point_id = ? WHERE id = ?"
139        ))
140        .bind(point_id)
141        .bind(entity_id)
142        .execute(&self.pool)
143        .await?;
144        Ok(())
145    }
146
147    /// Find entities matching `query` in name, summary, or aliases, up to `limit` results, ranked by relevance.
148    ///
149    /// Uses FTS5 MATCH with prefix wildcards (`token*`) and bm25 ranking. Name matches are
150    /// weighted 10x higher than summary matches. Also searches `graph_entity_aliases` for
151    /// alias matches via a UNION query.
152    ///
153    /// # Behavioral note
154    ///
155    /// This replaces the previous `LIKE '%query%'` implementation. FTS5 prefix matching differs
156    /// from substring matching: searching "SQL" will match "`SQLite`" (prefix) but NOT "`GraphQL`"
157    /// (substring). Entity names are indexed as single tokens by the unicode61 tokenizer, so
158    /// mid-word substrings are not matched. This is a known trade-off for index performance.
159    ///
160    /// Single-character queries (e.g., "a") are allowed and produce a broad prefix match ("a*").
161    /// The `limit` parameter caps the result set. No minimum query length is enforced; if this
162    /// causes noise in practice, add a minimum length guard at the call site.
163    ///
164    /// # Errors
165    ///
166    /// Returns an error if the database query fails.
167    pub async fn find_entities_fuzzy(
168        &self,
169        query: &str,
170        limit: usize,
171    ) -> Result<Vec<Entity>, MemoryError> {
172        // FTS5 boolean operator keywords (case-sensitive uppercase). Filtering these
173        // prevents syntax errors when user input contains them as literal search terms
174        // (e.g., "graph OR unrelated" must not produce "graph* OR* unrelated*").
175        const FTS5_OPERATORS: &[&str] = &["AND", "OR", "NOT", "NEAR"];
176        let query = &query[..query.floor_char_boundary(512)];
177        // Sanitize input: split on non-alphanumeric characters, filter empty tokens,
178        // append '*' to each token for FTS5 prefix matching ("graph" -> "graph*").
179        let sanitized = sanitize_fts_query(query);
180        if sanitized.is_empty() {
181            return Ok(vec![]);
182        }
183        let fts_query: String = sanitized
184            .split_whitespace()
185            .filter(|t| !FTS5_OPERATORS.contains(t))
186            .map(|t| format!("{t}*"))
187            .collect::<Vec<_>>()
188            .join(" ");
189        if fts_query.is_empty() {
190            return Ok(vec![]);
191        }
192
193        let limit = i64::try_from(limit)?;
194        // bm25(graph_entities_fts, 10.0, 1.0): name column weighted 10x over summary.
195        // bm25() returns negative values; ORDER BY ASC puts best matches first.
196        let search_sql = format!(
197            "SELECT DISTINCT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
198                    e.first_seen_at, e.last_seen_at, e.qdrant_point_id \
199             FROM graph_entities_fts fts \
200             JOIN graph_entities e ON e.id = fts.rowid \
201             WHERE graph_entities_fts MATCH ? \
202             UNION \
203             SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
204                    e.first_seen_at, e.last_seen_at, e.qdrant_point_id \
205             FROM graph_entity_aliases a \
206             JOIN graph_entities e ON e.id = a.entity_id \
207             WHERE a.alias_name LIKE ? ESCAPE '\\' {} \
208             LIMIT ?",
209            <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
210        );
211        let rows: Vec<EntityRow> = zeph_db::query_as(&search_sql)
212            .bind(&fts_query)
213            .bind(format!(
214                "%{}%",
215                query
216                    .trim()
217                    .replace('\\', "\\\\")
218                    .replace('%', "\\%")
219                    .replace('_', "\\_")
220            ))
221            .bind(limit)
222            .fetch_all(&self.pool)
223            .await?;
224        rows.into_iter()
225            .map(entity_from_row)
226            .collect::<Result<Vec<_>, _>>()
227    }
228
229    /// Flush the `SQLite` WAL to the main database file.
230    ///
231    /// Runs `PRAGMA wal_checkpoint(PASSIVE)`. Safe to call at any time; does not block active
232    /// readers or writers. Call after bulk entity inserts to ensure FTS5 shadow table writes are
233    /// visible to connections opened in future sessions.
234    ///
235    /// # Errors
236    ///
237    /// Returns an error if the PRAGMA execution fails.
238    #[cfg(all(feature = "sqlite", not(feature = "postgres")))]
239    pub async fn checkpoint_wal(&self) -> Result<(), MemoryError> {
240        zeph_db::query("PRAGMA wal_checkpoint(PASSIVE)")
241            .execute(&self.pool)
242            .await?;
243        Ok(())
244    }
245
246    /// No-op on `PostgreSQL` (WAL management is handled by the server).
247    ///
248    /// # Errors
249    ///
250    /// Never returns an error.
251    #[cfg(feature = "postgres")]
252    #[allow(clippy::unused_async)]
253    pub async fn checkpoint_wal(&self) -> Result<(), MemoryError> {
254        Ok(())
255    }
256
257    /// Stream all entities from the database incrementally (true cursor, no full-table load).
258    pub fn all_entities_stream(&self) -> impl Stream<Item = Result<Entity, MemoryError>> + '_ {
259        use futures::StreamExt as _;
260        zeph_db::query_as::<_, EntityRow>(
261            sql!("SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
262             FROM graph_entities ORDER BY id ASC"),
263        )
264        .fetch(&self.pool)
265        .map(|r: Result<EntityRow, zeph_db::SqlxError>| {
266            r.map_err(MemoryError::from).and_then(entity_from_row)
267        })
268    }
269
270    // ── Alias methods ─────────────────────────────────────────────────────────
271
272    /// Insert an alias for an entity (idempotent: duplicate alias is silently ignored via UNIQUE constraint).
273    ///
274    /// # Errors
275    ///
276    /// Returns an error if the database query fails.
277    pub async fn add_alias(&self, entity_id: i64, alias_name: &str) -> Result<(), MemoryError> {
278        let insert_alias_sql = format!(
279            "{} INTO graph_entity_aliases (entity_id, alias_name) VALUES (?, ?){}",
280            <ActiveDialect as zeph_db::dialect::Dialect>::INSERT_IGNORE,
281            <ActiveDialect as zeph_db::dialect::Dialect>::CONFLICT_NOTHING,
282        );
283        zeph_db::query(&insert_alias_sql)
284            .bind(entity_id)
285            .bind(alias_name)
286            .execute(&self.pool)
287            .await?;
288        Ok(())
289    }
290
291    /// Find an entity by alias name and entity type (case-insensitive).
292    ///
293    /// Filters by `entity_type` to avoid cross-type alias collisions (S2 fix).
294    ///
295    /// # Errors
296    ///
297    /// Returns an error if the database query fails.
298    pub async fn find_entity_by_alias(
299        &self,
300        alias_name: &str,
301        entity_type: EntityType,
302    ) -> Result<Option<Entity>, MemoryError> {
303        let type_str = entity_type.as_str();
304        let alias_typed_sql = format!(
305            "SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
306                    e.first_seen_at, e.last_seen_at, e.qdrant_point_id \
307             FROM graph_entity_aliases a \
308             JOIN graph_entities e ON e.id = a.entity_id \
309             WHERE a.alias_name = ? {} \
310               AND e.entity_type = ? \
311             ORDER BY e.id ASC \
312             LIMIT 1",
313            <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
314        );
315        let row: Option<EntityRow> = zeph_db::query_as(&alias_typed_sql)
316            .bind(alias_name)
317            .bind(type_str)
318            .fetch_optional(&self.pool)
319            .await?;
320        row.map(entity_from_row).transpose()
321    }
322
323    /// Get all aliases for an entity.
324    ///
325    /// # Errors
326    ///
327    /// Returns an error if the database query fails.
328    pub async fn aliases_for_entity(
329        &self,
330        entity_id: i64,
331    ) -> Result<Vec<EntityAlias>, MemoryError> {
332        let rows: Vec<AliasRow> = zeph_db::query_as(sql!(
333            "SELECT id, entity_id, alias_name, created_at
334             FROM graph_entity_aliases
335             WHERE entity_id = ?
336             ORDER BY id ASC"
337        ))
338        .bind(entity_id)
339        .fetch_all(&self.pool)
340        .await?;
341        Ok(rows.into_iter().map(alias_from_row).collect())
342    }
343
344    /// Collect all entities into a Vec.
345    ///
346    /// # Errors
347    ///
348    /// Returns an error if the database query fails or `entity_type` parsing fails.
349    pub async fn all_entities(&self) -> Result<Vec<Entity>, MemoryError> {
350        use futures::TryStreamExt as _;
351        self.all_entities_stream().try_collect().await
352    }
353
354    /// Count the total number of entities.
355    ///
356    /// # Errors
357    ///
358    /// Returns an error if the database query fails.
359    pub async fn entity_count(&self) -> Result<i64, MemoryError> {
360        let count: i64 = zeph_db::query_scalar(sql!("SELECT COUNT(*) FROM graph_entities"))
361            .fetch_one(&self.pool)
362            .await?;
363        Ok(count)
364    }
365
366    // ── Edges ─────────────────────────────────────────────────────────────────
367
368    /// Insert a new edge between two entities, or update the existing active edge.
369    ///
370    /// An active edge is identified by `(source_entity_id, target_entity_id, relation, edge_type)`
371    /// with `valid_to IS NULL`. If such an edge already exists, its `confidence` is updated to the
372    /// maximum of the stored and incoming values, and the existing id is returned. This prevents
373    /// duplicate edges from repeated extraction of the same context messages.
374    ///
375    /// The dedup key includes `edge_type` (critic mitigation): the same `(source, target, relation)`
376    /// triple can legitimately exist with different edge types (e.g., `depends_on` can be both
377    /// Semantic and Causal). Without `edge_type` in the key, the second insertion would silently
378    /// update the first and lose the type classification.
379    ///
380    /// # Errors
381    ///
382    /// Returns an error if the database query fails.
383    pub async fn insert_edge(
384        &self,
385        source_entity_id: i64,
386        target_entity_id: i64,
387        relation: &str,
388        fact: &str,
389        confidence: f32,
390        episode_id: Option<MessageId>,
391    ) -> Result<i64, MemoryError> {
392        self.insert_edge_typed(
393            source_entity_id,
394            target_entity_id,
395            relation,
396            fact,
397            confidence,
398            episode_id,
399            EdgeType::Semantic,
400        )
401        .await
402    }
403
404    /// Insert a typed edge between two entities, or update the existing active edge of the same type.
405    ///
406    /// Identical semantics to [`Self::insert_edge`] but with an explicit `edge_type` parameter.
407    /// The dedup key is `(source_entity_id, target_entity_id, relation, edge_type, valid_to IS NULL)`.
408    ///
409    /// # Errors
410    ///
411    /// Returns an error if the database query fails.
412    #[allow(clippy::too_many_arguments)] // function with many required inputs; a *Params struct would be more verbose without simplifying the call site
413    pub async fn insert_edge_typed(
414        &self,
415        source_entity_id: i64,
416        target_entity_id: i64,
417        relation: &str,
418        fact: &str,
419        confidence: f32,
420        episode_id: Option<MessageId>,
421        edge_type: EdgeType,
422    ) -> Result<i64, MemoryError> {
423        if source_entity_id == target_entity_id {
424            return Err(MemoryError::InvalidInput(format!(
425                "self-loop edge rejected: source and target are the same entity (id={source_entity_id})"
426            )));
427        }
428        let confidence = confidence.clamp(0.0, 1.0);
429        let edge_type_str = edge_type.as_str();
430
431        // Wrap SELECT + INSERT/UPDATE in a single transaction to eliminate the race window
432        // between existence check and write. The unique partial index uq_graph_edges_active
433        // covers (source, target, relation, edge_type) WHERE valid_to IS NULL; SQLite does not
434        // support ON CONFLICT DO UPDATE against partial indexes, so we keep two statements.
435        let mut tx = zeph_db::begin(&self.pool).await?;
436
437        let existing: Option<(i64, f64)> = zeph_db::query_as(sql!(
438            "SELECT id, confidence FROM graph_edges
439             WHERE source_entity_id = ?
440               AND target_entity_id = ?
441               AND relation = ?
442               AND edge_type = ?
443               AND valid_to IS NULL
444             LIMIT 1"
445        ))
446        .bind(source_entity_id)
447        .bind(target_entity_id)
448        .bind(relation)
449        .bind(edge_type_str)
450        .fetch_optional(&mut *tx)
451        .await?;
452
453        if let Some((existing_id, stored_conf)) = existing {
454            let updated_conf = f64::from(confidence).max(stored_conf);
455            zeph_db::query(sql!("UPDATE graph_edges SET confidence = ? WHERE id = ?"))
456                .bind(updated_conf)
457                .bind(existing_id)
458                .execute(&mut *tx)
459                .await?;
460            tx.commit().await?;
461            return Ok(existing_id);
462        }
463
464        let episode_raw: Option<i64> = episode_id.map(|m| m.0);
465        let id: i64 = zeph_db::query_scalar(sql!(
466            "INSERT INTO graph_edges
467             (source_entity_id, target_entity_id, relation, fact, confidence, episode_id, edge_type)
468             VALUES (?, ?, ?, ?, ?, ?, ?)
469             RETURNING id"
470        ))
471        .bind(source_entity_id)
472        .bind(target_entity_id)
473        .bind(relation)
474        .bind(fact)
475        .bind(f64::from(confidence))
476        .bind(episode_raw)
477        .bind(edge_type_str)
478        .fetch_one(&mut *tx)
479        .await?;
480        tx.commit().await?;
481        Ok(id)
482    }
483
484    /// Mark an edge as invalid (set `valid_to` and `expired_at` to now).
485    ///
486    /// # Errors
487    ///
488    /// Returns an error if the database update fails.
489    pub async fn invalidate_edge(&self, edge_id: i64) -> Result<(), MemoryError> {
490        zeph_db::query(sql!(
491            "UPDATE graph_edges SET valid_to = CURRENT_TIMESTAMP, expired_at = CURRENT_TIMESTAMP
492             WHERE id = ?"
493        ))
494        .bind(edge_id)
495        .execute(&self.pool)
496        .await?;
497        Ok(())
498    }
499
500    /// Invalidate an edge and record the supersession pointer for Kumiho belief revision audit trail.
501    ///
502    /// Sets `valid_to`, `expired_at`, and `superseded_by` on the old edge to link it to its replacement.
503    ///
504    /// # Errors
505    ///
506    /// Returns an error if the database update fails.
507    pub async fn invalidate_edge_with_supersession(
508        &self,
509        old_edge_id: i64,
510        new_edge_id: i64,
511    ) -> Result<(), MemoryError> {
512        zeph_db::query(sql!(
513            "UPDATE graph_edges
514             SET valid_to = CURRENT_TIMESTAMP,
515                 expired_at = CURRENT_TIMESTAMP,
516                 superseded_by = ?
517             WHERE id = ?"
518        ))
519        .bind(new_edge_id)
520        .bind(old_edge_id)
521        .execute(&self.pool)
522        .await?;
523        Ok(())
524    }
525
526    /// Get all active edges for a batch of entity IDs, with optional MAGMA edge type filtering.
527    ///
528    /// Fetches all currently-active edges (`valid_to IS NULL`) where either endpoint
529    /// is in `entity_ids`. Traversal is always current-time only (no `at_timestamp` support
530    /// in v1 — see `bfs_at_timestamp` for historical traversal).
531    ///
532    /// # `SQLite` bind limit safety
533    ///
534    /// `SQLite` limits the number of bind parameters to `SQLITE_MAX_VARIABLE_NUMBER` (999 by
535    /// default). Each entity ID requires two bind slots (source OR target), so batches are
536    /// chunked at `MAX_BATCH_ENTITIES = 490` to stay safely under the limit regardless of
537    /// compile-time `SQLite` configuration.
538    ///
539    /// # Errors
540    ///
541    /// Returns an error if the database query fails.
542    pub async fn edges_for_entities(
543        &self,
544        entity_ids: &[i64],
545        edge_types: &[super::types::EdgeType],
546    ) -> Result<Vec<Edge>, MemoryError> {
547        // Safe margin under SQLite SQLITE_MAX_VARIABLE_NUMBER (999):
548        // each entity ID uses 2 bind slots (source_entity_id OR target_entity_id).
549        // 490 * 2 = 980, leaving headroom for future query additions.
550        const MAX_BATCH_ENTITIES: usize = 490;
551
552        let mut all_edges: Vec<Edge> = Vec::new();
553
554        for chunk in entity_ids.chunks(MAX_BATCH_ENTITIES) {
555            let edges = self.query_batch_edges(chunk, edge_types).await?;
556            all_edges.extend(edges);
557        }
558
559        Ok(all_edges)
560    }
561
562    /// Query active edges for a single chunk of entity IDs (internal helper).
563    ///
564    /// Caller is responsible for ensuring `entity_ids.len() <= MAX_BATCH_ENTITIES`.
565    ///
566    /// # Errors
567    ///
568    /// Returns an error if any database query fails.
569    async fn query_batch_edges(
570        &self,
571        entity_ids: &[i64],
572        edge_types: &[super::types::EdgeType],
573    ) -> Result<Vec<Edge>, MemoryError> {
574        if entity_ids.is_empty() {
575            return Ok(Vec::new());
576        }
577
578        // Build a parameterized IN clause with backend-appropriate placeholders.
579        // We cannot use the sql! macro here because the placeholder count is dynamic.
580        let n_ids = entity_ids.len();
581        let n_types = edge_types.len();
582
583        let sql = if n_types == 0 {
584            // placeholders used twice (source IN and target IN)
585            let placeholders = placeholder_list(1, n_ids);
586            let placeholders2 = placeholder_list(n_ids + 1, n_ids);
587            format!(
588                "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
589                        valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
590                        edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
591                 FROM graph_edges
592                 WHERE valid_to IS NULL
593                   AND (source_entity_id IN ({placeholders}) OR target_entity_id IN ({placeholders2}))"
594            )
595        } else {
596            let placeholders = placeholder_list(1, n_ids);
597            let placeholders2 = placeholder_list(n_ids + 1, n_ids);
598            let type_placeholders = placeholder_list(n_ids * 2 + 1, n_types);
599            format!(
600                "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
601                        valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
602                        edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
603                 FROM graph_edges
604                 WHERE valid_to IS NULL
605                   AND (source_entity_id IN ({placeholders}) OR target_entity_id IN ({placeholders2}))
606                   AND edge_type IN ({type_placeholders})"
607            )
608        };
609
610        // Bind entity IDs twice (source IN and target IN clauses) then edge types.
611        let mut query = zeph_db::query_as::<_, EdgeRow>(&sql);
612        for id in entity_ids {
613            query = query.bind(*id);
614        }
615        for id in entity_ids {
616            query = query.bind(*id);
617        }
618        for et in edge_types {
619            query = query.bind(et.as_str());
620        }
621
622        // Wrap pool.acquire() + query execution in a short timeout to prevent the outer
623        // tokio::time::timeout (in SemanticMemory recall) from cancelling a mid-acquire
624        // future, which causes sqlx 0.8 semaphore count drift and permanent pool starvation.
625        let rows: Vec<EdgeRow> = tokio::time::timeout(
626            std::time::Duration::from_millis(500),
627            query.fetch_all(&self.pool),
628        )
629        .await
630        .map_err(|_| MemoryError::Timeout("graph pool acquire timed out after 500ms".into()))??;
631        Ok(rows.into_iter().map(edge_from_row).collect())
632    }
633
634    /// Get all active edges where entity is source or target.
635    ///
636    /// # Errors
637    ///
638    /// Returns an error if the database query fails.
639    pub async fn edges_for_entity(&self, entity_id: i64) -> Result<Vec<Edge>, MemoryError> {
640        let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
641            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
642                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
643                    edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
644             FROM graph_edges
645             WHERE valid_to IS NULL
646               AND (source_entity_id = ? OR target_entity_id = ?)"
647        ))
648        .bind(entity_id)
649        .bind(entity_id)
650        .fetch_all(&self.pool)
651        .await?;
652        Ok(rows.into_iter().map(edge_from_row).collect())
653    }
654
655    /// Get all edges (active and expired) where entity is source or target, ordered by
656    /// `valid_from DESC`. Used by the `/graph history <name>` slash command.
657    ///
658    /// # Errors
659    ///
660    /// Returns an error if the database query fails or if `limit` overflows `i64`.
661    pub async fn edge_history_for_entity(
662        &self,
663        entity_id: i64,
664        limit: usize,
665    ) -> Result<Vec<Edge>, MemoryError> {
666        let limit = i64::try_from(limit)?;
667        let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
668            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
669                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
670                    edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
671             FROM graph_edges
672             WHERE source_entity_id = ? OR target_entity_id = ?
673             ORDER BY valid_from DESC
674             LIMIT ?"
675        ))
676        .bind(entity_id)
677        .bind(entity_id)
678        .bind(limit)
679        .fetch_all(&self.pool)
680        .await?;
681        Ok(rows.into_iter().map(edge_from_row).collect())
682    }
683
684    /// Get all active edges between two entities (both directions).
685    ///
686    /// # Errors
687    ///
688    /// Returns an error if the database query fails.
689    pub async fn edges_between(
690        &self,
691        entity_a: i64,
692        entity_b: i64,
693    ) -> Result<Vec<Edge>, MemoryError> {
694        let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
695            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
696                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
697                    edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
698             FROM graph_edges
699             WHERE valid_to IS NULL
700               AND ((source_entity_id = ? AND target_entity_id = ?)
701                 OR (source_entity_id = ? AND target_entity_id = ?))"
702        ))
703        .bind(entity_a)
704        .bind(entity_b)
705        .bind(entity_b)
706        .bind(entity_a)
707        .fetch_all(&self.pool)
708        .await?;
709        Ok(rows.into_iter().map(edge_from_row).collect())
710    }
711
712    /// Get active edges from `source` to `target` in the exact direction (no reverse).
713    ///
714    /// # Errors
715    ///
716    /// Returns an error if the database query fails.
717    pub async fn edges_exact(
718        &self,
719        source_entity_id: i64,
720        target_entity_id: i64,
721    ) -> Result<Vec<Edge>, MemoryError> {
722        let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
723            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
724                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
725                    edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
726             FROM graph_edges
727             WHERE valid_to IS NULL
728               AND source_entity_id = ?
729               AND target_entity_id = ?"
730        ))
731        .bind(source_entity_id)
732        .bind(target_entity_id)
733        .fetch_all(&self.pool)
734        .await?;
735        Ok(rows.into_iter().map(edge_from_row).collect())
736    }
737
738    /// Count active (non-invalidated) edges.
739    ///
740    /// # Errors
741    ///
742    /// Returns an error if the database query fails.
743    pub async fn active_edge_count(&self) -> Result<i64, MemoryError> {
744        let count: i64 = zeph_db::query_scalar(sql!(
745            "SELECT COUNT(*) FROM graph_edges WHERE valid_to IS NULL"
746        ))
747        .fetch_one(&self.pool)
748        .await?;
749        Ok(count)
750    }
751
752    /// Return per-type active edge counts as `(edge_type, count)` pairs.
753    ///
754    /// # Errors
755    ///
756    /// Returns an error if the database query fails.
757    pub async fn edge_type_distribution(&self) -> Result<Vec<(String, i64)>, MemoryError> {
758        let rows: Vec<(String, i64)> = zeph_db::query_as(
759            sql!("SELECT edge_type, COUNT(*) FROM graph_edges WHERE valid_to IS NULL GROUP BY edge_type ORDER BY edge_type"),
760        )
761        .fetch_all(&self.pool)
762        .await?;
763        Ok(rows)
764    }
765
766    // ── Communities ───────────────────────────────────────────────────────────
767
768    /// Insert or update a community by name.
769    ///
770    /// `fingerprint` is a BLAKE3 hex string computed from sorted entity IDs and
771    /// intra-community edge IDs. Pass `None` to leave the fingerprint unchanged (e.g. when
772    /// `assign_to_community` adds an entity without a full re-detection pass).
773    ///
774    /// # Errors
775    ///
776    /// Returns an error if the database query fails or JSON serialization fails.
777    pub async fn upsert_community(
778        &self,
779        name: &str,
780        summary: &str,
781        entity_ids: &[i64],
782        fingerprint: Option<&str>,
783    ) -> Result<i64, MemoryError> {
784        let entity_ids_json = serde_json::to_string(entity_ids)?;
785        let id: i64 = zeph_db::query_scalar(sql!(
786            "INSERT INTO graph_communities (name, summary, entity_ids, fingerprint)
787             VALUES (?, ?, ?, ?)
788             ON CONFLICT(name) DO UPDATE SET
789               summary = excluded.summary,
790               entity_ids = excluded.entity_ids,
791               fingerprint = COALESCE(excluded.fingerprint, fingerprint),
792               updated_at = CURRENT_TIMESTAMP
793             RETURNING id"
794        ))
795        .bind(name)
796        .bind(summary)
797        .bind(entity_ids_json)
798        .bind(fingerprint)
799        .fetch_one(&self.pool)
800        .await?;
801        Ok(id)
802    }
803
804    /// Return a map of `fingerprint -> community_id` for all communities with a non-NULL
805    /// fingerprint. Used by `detect_communities` to skip unchanged partitions.
806    ///
807    /// # Errors
808    ///
809    /// Returns an error if the database query fails.
810    pub async fn community_fingerprints(&self) -> Result<HashMap<String, i64>, MemoryError> {
811        let rows: Vec<(String, i64)> = zeph_db::query_as(sql!(
812            "SELECT fingerprint, id FROM graph_communities WHERE fingerprint IS NOT NULL"
813        ))
814        .fetch_all(&self.pool)
815        .await?;
816        Ok(rows.into_iter().collect())
817    }
818
819    /// Delete a single community by its primary key.
820    ///
821    /// # Errors
822    ///
823    /// Returns an error if the database query fails.
824    pub async fn delete_community_by_id(&self, id: i64) -> Result<(), MemoryError> {
825        zeph_db::query(sql!("DELETE FROM graph_communities WHERE id = ?"))
826            .bind(id)
827            .execute(&self.pool)
828            .await?;
829        Ok(())
830    }
831
832    /// Set the fingerprint of a community to `NULL`, invalidating the incremental cache.
833    ///
834    /// Used by `assign_to_community` when an entity is added without a full re-detection pass,
835    /// ensuring the next `detect_communities` run re-summarizes the affected community.
836    ///
837    /// # Errors
838    ///
839    /// Returns an error if the database query fails.
840    pub async fn clear_community_fingerprint(&self, id: i64) -> Result<(), MemoryError> {
841        zeph_db::query(sql!(
842            "UPDATE graph_communities SET fingerprint = NULL WHERE id = ?"
843        ))
844        .bind(id)
845        .execute(&self.pool)
846        .await?;
847        Ok(())
848    }
849
850    /// Find the first community that contains the given `entity_id`.
851    ///
852    /// Uses `json_each()` to push the membership search into `SQLite`, avoiding a full
853    /// table scan with per-row JSON parsing.
854    ///
855    /// # Errors
856    ///
857    /// Returns an error if the database query fails or JSON parsing fails.
858    pub async fn community_for_entity(
859        &self,
860        entity_id: i64,
861    ) -> Result<Option<Community>, MemoryError> {
862        let row: Option<CommunityRow> = zeph_db::query_as(
863            sql!("SELECT c.id, c.name, c.summary, c.entity_ids, c.fingerprint, c.created_at, c.updated_at
864             FROM graph_communities c, json_each(c.entity_ids) j
865             WHERE CAST(j.value AS INTEGER) = ?
866             LIMIT 1"),
867        )
868        .bind(entity_id)
869        .fetch_optional(&self.pool)
870        .await?;
871        match row {
872            Some(row) => {
873                let raw_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
874                let entity_ids = raw_ids.into_iter().map(EntityId).collect();
875                Ok(Some(Community {
876                    id: row.id,
877                    name: row.name,
878                    summary: row.summary,
879                    entity_ids,
880                    fingerprint: row.fingerprint,
881                    created_at: row.created_at,
882                    updated_at: row.updated_at,
883                }))
884            }
885            None => Ok(None),
886        }
887    }
888
889    /// Get all communities.
890    ///
891    /// # Errors
892    ///
893    /// Returns an error if the database query fails or JSON parsing fails.
894    pub async fn all_communities(&self) -> Result<Vec<Community>, MemoryError> {
895        let rows: Vec<CommunityRow> = zeph_db::query_as(sql!(
896            "SELECT id, name, summary, entity_ids, fingerprint, created_at, updated_at
897             FROM graph_communities
898             ORDER BY id ASC"
899        ))
900        .fetch_all(&self.pool)
901        .await?;
902
903        rows.into_iter()
904            .map(|row| {
905                let raw_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
906                let entity_ids = raw_ids.into_iter().map(EntityId).collect();
907                Ok(Community {
908                    id: row.id,
909                    name: row.name,
910                    summary: row.summary,
911                    entity_ids,
912                    fingerprint: row.fingerprint,
913                    created_at: row.created_at,
914                    updated_at: row.updated_at,
915                })
916            })
917            .collect()
918    }
919
920    /// Count the total number of communities.
921    ///
922    /// # Errors
923    ///
924    /// Returns an error if the database query fails.
925    pub async fn community_count(&self) -> Result<i64, MemoryError> {
926        let count: i64 = zeph_db::query_scalar(sql!("SELECT COUNT(*) FROM graph_communities"))
927            .fetch_one(&self.pool)
928            .await?;
929        Ok(count)
930    }
931
932    // ── Metadata ──────────────────────────────────────────────────────────────
933
934    /// Get a metadata value by key.
935    ///
936    /// # Errors
937    ///
938    /// Returns an error if the database query fails.
939    pub async fn get_metadata(&self, key: &str) -> Result<Option<String>, MemoryError> {
940        let val: Option<String> =
941            zeph_db::query_scalar(sql!("SELECT value FROM graph_metadata WHERE key = ?"))
942                .bind(key)
943                .fetch_optional(&self.pool)
944                .await?;
945        Ok(val)
946    }
947
948    /// Set a metadata value by key (upsert).
949    ///
950    /// # Errors
951    ///
952    /// Returns an error if the database query fails.
953    pub async fn set_metadata(&self, key: &str, value: &str) -> Result<(), MemoryError> {
954        zeph_db::query(sql!(
955            "INSERT INTO graph_metadata (key, value) VALUES (?, ?)
956             ON CONFLICT(key) DO UPDATE SET value = excluded.value"
957        ))
958        .bind(key)
959        .bind(value)
960        .execute(&self.pool)
961        .await?;
962        Ok(())
963    }
964
965    /// Get the current extraction count from metadata.
966    ///
967    /// Returns 0 if the counter has not been initialized.
968    ///
969    /// # Errors
970    ///
971    /// Returns an error if the database query fails.
972    pub async fn extraction_count(&self) -> Result<i64, MemoryError> {
973        let val = self.get_metadata("extraction_count").await?;
974        Ok(val.and_then(|v| v.parse::<i64>().ok()).unwrap_or(0))
975    }
976
977    /// Stream all active (non-invalidated) edges.
978    pub fn all_active_edges_stream(&self) -> impl Stream<Item = Result<Edge, MemoryError>> + '_ {
979        use futures::StreamExt as _;
980        zeph_db::query_as::<_, EdgeRow>(sql!(
981            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
982                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
983                    edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
984             FROM graph_edges
985             WHERE valid_to IS NULL
986             ORDER BY id ASC"
987        ))
988        .fetch(&self.pool)
989        .map(|r| r.map_err(MemoryError::from).map(edge_from_row))
990    }
991
992    /// Fetch a chunk of active edges using keyset pagination.
993    ///
994    /// Returns edges with `id > after_id` in ascending order, up to `limit` rows.
995    /// Starting with `after_id = 0` returns the first chunk. Pass the last `id` from
996    /// the returned chunk as `after_id` for the next page. An empty result means all
997    /// edges have been consumed.
998    ///
999    /// Keyset pagination is O(1) per page (index seek on `id`) vs OFFSET which is O(N).
1000    /// It is also stable under concurrent inserts: new edges get monotonically higher IDs
1001    /// and will appear in subsequent chunks or after the last chunk, never causing
1002    /// duplicates. Concurrent invalidations (setting `valid_to`) may cause a single edge
1003    /// to be skipped, which is acceptable — LPA operates on an eventual-consistency snapshot.
1004    ///
1005    /// # Errors
1006    ///
1007    /// Returns an error if the database query fails.
1008    pub async fn edges_after_id(
1009        &self,
1010        after_id: i64,
1011        limit: i64,
1012    ) -> Result<Vec<Edge>, MemoryError> {
1013        let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
1014            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1015                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1016                    edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
1017             FROM graph_edges
1018             WHERE valid_to IS NULL AND id > ?
1019             ORDER BY id ASC
1020             LIMIT ?"
1021        ))
1022        .bind(after_id)
1023        .bind(limit)
1024        .fetch_all(&self.pool)
1025        .await?;
1026        Ok(rows.into_iter().map(edge_from_row).collect())
1027    }
1028
1029    /// Find a community by its primary key.
1030    ///
1031    /// # Errors
1032    ///
1033    /// Returns an error if the database query fails or JSON parsing fails.
1034    pub async fn find_community_by_id(&self, id: i64) -> Result<Option<Community>, MemoryError> {
1035        let row: Option<CommunityRow> = zeph_db::query_as(sql!(
1036            "SELECT id, name, summary, entity_ids, fingerprint, created_at, updated_at
1037             FROM graph_communities
1038             WHERE id = ?"
1039        ))
1040        .bind(id)
1041        .fetch_optional(&self.pool)
1042        .await?;
1043        match row {
1044            Some(row) => {
1045                let raw_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
1046                let entity_ids = raw_ids.into_iter().map(EntityId).collect();
1047                Ok(Some(Community {
1048                    id: row.id,
1049                    name: row.name,
1050                    summary: row.summary,
1051                    entity_ids,
1052                    fingerprint: row.fingerprint,
1053                    created_at: row.created_at,
1054                    updated_at: row.updated_at,
1055                }))
1056            }
1057            None => Ok(None),
1058        }
1059    }
1060
1061    /// Delete all communities (full rebuild before upsert).
1062    ///
1063    /// # Errors
1064    ///
1065    /// Returns an error if the database query fails.
1066    pub async fn delete_all_communities(&self) -> Result<(), MemoryError> {
1067        zeph_db::query(sql!("DELETE FROM graph_communities"))
1068            .execute(&self.pool)
1069            .await?;
1070        Ok(())
1071    }
1072
1073    // ── A-MEM Retrieval Tracking ──────────────────────────────────────────────
1074
1075    /// Find entities matching `query` and return them with normalized FTS5 scores.
1076    ///
1077    /// Returns `Vec<(Entity, fts_score)>` where `fts_score` is normalized to `[0.0, 1.0]`
1078    /// by dividing each negated BM25 value by the maximum in the result set.
1079    /// Alias matches receive a fixed score of `0.5` (relative to FTS matches before normalization).
1080    ///
1081    /// Uses `UNION ALL` with outer `ORDER BY` to preserve FTS5 ordering through the LIMIT.
1082    ///
1083    /// # Errors
1084    ///
1085    /// Returns an error if the database query fails.
1086    pub async fn find_entities_ranked(
1087        &self,
1088        query: &str,
1089        limit: usize,
1090    ) -> Result<Vec<(Entity, f32)>, MemoryError> {
1091        let query = &query[..query.floor_char_boundary(512)];
1092        let Some(fts_query) = build_fts_query(query) else {
1093            return Ok(vec![]);
1094        };
1095
1096        let limit_i64 = i64::try_from(limit)?;
1097        let ranked_fts_sql = build_ranked_fts_sql();
1098        let rows: Vec<EntityFtsRow> = zeph_db::query_as(&ranked_fts_sql)
1099            .bind(&fts_query)
1100            .bind(format!(
1101                "%{}%",
1102                query
1103                    .trim()
1104                    .replace('\\', "\\\\")
1105                    .replace('%', "\\%")
1106                    .replace('_', "\\_")
1107            ))
1108            .bind(limit_i64)
1109            .fetch_all(&self.pool)
1110            .await?;
1111
1112        if rows.is_empty() {
1113            return Ok(vec![]);
1114        }
1115
1116        Ok(normalize_and_dedup(rows))
1117    }
1118
1119    /// Compute structural scores (degree + edge type diversity) for a batch of entity IDs.
1120    ///
1121    /// Returns `HashMap<entity_id, structural_score>` where score is in `[0.0, 1.0]`.
1122    /// Formula: `0.6 * (degree / max_degree) + 0.4 * (type_diversity / 4.0)`.
1123    /// Entities with no edges receive score `0.0`.
1124    ///
1125    /// # Errors
1126    ///
1127    /// Returns an error if the database query fails.
1128    pub async fn entity_structural_scores(
1129        &self,
1130        entity_ids: &[i64],
1131    ) -> Result<HashMap<i64, f32>, MemoryError> {
1132        // Each query binds entity_ids three times (three IN clauses).
1133        // Stay safely under SQLite 999-variable limit: 999 / 3 = 333, use 163 for headroom.
1134        const MAX_BATCH: usize = 163;
1135
1136        if entity_ids.is_empty() {
1137            return Ok(HashMap::new());
1138        }
1139
1140        let mut all_rows: Vec<(i64, i64, i64)> = Vec::new();
1141        for chunk in entity_ids.chunks(MAX_BATCH) {
1142            let n = chunk.len();
1143            // Three copies of chunk IDs: positions 1..n, n+1..2n, 2n+1..3n
1144            let ph1 = placeholder_list(1, n);
1145            let ph2 = placeholder_list(n + 1, n);
1146            let ph3 = placeholder_list(n * 2 + 1, n);
1147
1148            // Build query: count degree and distinct edge types for each entity.
1149            let sql = format!(
1150                "SELECT entity_id,
1151                        COUNT(*) AS degree,
1152                        COUNT(DISTINCT edge_type) AS type_diversity
1153                 FROM (
1154                     SELECT source_entity_id AS entity_id, edge_type
1155                     FROM graph_edges
1156                     WHERE valid_to IS NULL AND source_entity_id IN ({ph1})
1157                     UNION ALL
1158                     SELECT target_entity_id AS entity_id, edge_type
1159                     FROM graph_edges
1160                     WHERE valid_to IS NULL AND target_entity_id IN ({ph2})
1161                 )
1162                 WHERE entity_id IN ({ph3})
1163                 GROUP BY entity_id"
1164            );
1165
1166            let mut query = zeph_db::query_as::<_, (i64, i64, i64)>(&sql);
1167            // Bind chunk three times (three IN clauses)
1168            for id in chunk {
1169                query = query.bind(*id);
1170            }
1171            for id in chunk {
1172                query = query.bind(*id);
1173            }
1174            for id in chunk {
1175                query = query.bind(*id);
1176            }
1177
1178            let chunk_rows: Vec<(i64, i64, i64)> = query.fetch_all(&self.pool).await?;
1179            all_rows.extend(chunk_rows);
1180        }
1181
1182        if all_rows.is_empty() {
1183            return Ok(entity_ids.iter().map(|&id| (id, 0.0_f32)).collect());
1184        }
1185
1186        let max_degree = all_rows
1187            .iter()
1188            .map(|(_, d, _)| *d)
1189            .max()
1190            .unwrap_or(1)
1191            .max(1);
1192
1193        let mut scores: HashMap<i64, f32> = entity_ids.iter().map(|&id| (id, 0.0_f32)).collect();
1194        for (entity_id, degree, type_diversity) in all_rows {
1195            #[allow(clippy::cast_precision_loss)]
1196            let norm_degree = degree as f32 / max_degree as f32;
1197            #[allow(clippy::cast_precision_loss)]
1198            let norm_diversity = (type_diversity as f32 / 4.0).min(1.0);
1199            let score = 0.6 * norm_degree + 0.4 * norm_diversity;
1200            scores.insert(entity_id, score);
1201        }
1202
1203        Ok(scores)
1204    }
1205
1206    /// Look up community IDs for a batch of entity IDs.
1207    ///
1208    /// Returns `HashMap<entity_id, community_id>`. Entities not assigned to any community
1209    /// are absent from the map (treated as `None` by callers — no community cap applied).
1210    ///
1211    /// # Errors
1212    ///
1213    /// Returns an error if the database query fails.
1214    #[cfg(any(feature = "sqlite", feature = "postgres"))]
1215    pub async fn entity_community_ids(
1216        &self,
1217        entity_ids: &[i64],
1218    ) -> Result<HashMap<i64, i64>, MemoryError> {
1219        const MAX_BATCH: usize = 490;
1220
1221        if entity_ids.is_empty() {
1222            return Ok(HashMap::new());
1223        }
1224
1225        let mut result: HashMap<i64, i64> = HashMap::new();
1226        for chunk in entity_ids.chunks(MAX_BATCH) {
1227            let placeholders = placeholder_list(1, chunk.len());
1228
1229            let community_sql = community_ids_sql(&placeholders);
1230            let mut query = zeph_db::query_as::<_, (i64, i64)>(&community_sql);
1231            for id in chunk {
1232                query = query.bind(*id);
1233            }
1234
1235            let rows: Vec<(i64, i64)> = query.fetch_all(&self.pool).await?;
1236            result.extend(rows);
1237        }
1238
1239        Ok(result)
1240    }
1241
1242    /// Increment `retrieval_count` and set `last_retrieved_at` for a batch of edge IDs.
1243    ///
1244    /// Fire-and-forget: errors are logged but not propagated. Caller should log the warning.
1245    /// Batched with `MAX_BATCH = 490` to stay safely under `SQLite` bind variable limit.
1246    ///
1247    /// # Errors
1248    ///
1249    /// Returns an error if the database query fails.
1250    pub async fn record_edge_retrieval(&self, edge_ids: &[i64]) -> Result<(), MemoryError> {
1251        const MAX_BATCH: usize = 490;
1252        let epoch_now = <ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1253        for chunk in edge_ids.chunks(MAX_BATCH) {
1254            let edge_placeholders = placeholder_list(1, chunk.len());
1255            let retrieval_sql = format!(
1256                "UPDATE graph_edges \
1257                 SET retrieval_count = retrieval_count + 1, \
1258                     last_retrieved_at = {epoch_now} \
1259                 WHERE id IN ({edge_placeholders})"
1260            );
1261            let mut q = zeph_db::query(&retrieval_sql);
1262            for id in chunk {
1263                q = q.bind(*id);
1264            }
1265            q.execute(&self.pool).await?;
1266        }
1267        Ok(())
1268    }
1269
1270    /// Increment `weight` on the set of edges traversed during the current recall (HL-F2, #3344).
1271    ///
1272    /// Mirrors [`Self::record_edge_retrieval`] in shape: same `MAX_BATCH = 490` chunking,
1273    /// same `WHERE id IN (…) AND valid_to IS NULL` filter (defensive — traversed edges should
1274    /// already be active, but this prevents reinforcing tombstoned edges).
1275    ///
1276    /// No-op when `edge_ids` is empty or `delta == 0.0`.
1277    ///
1278    /// # Errors
1279    ///
1280    /// Returns an error if the underlying `UPDATE` fails.
1281    #[tracing::instrument(
1282        name = "memory.graph.hebbian_increment",
1283        skip_all,
1284        fields(edge_count = edge_ids.len())
1285    )]
1286    pub async fn apply_hebbian_increment(
1287        &self,
1288        edge_ids: &[i64],
1289        delta: f32,
1290    ) -> Result<(), MemoryError> {
1291        // MAX_BATCH chosen to stay under SQLite's 999 host-parameter limit
1292        // ($1 = delta, $2..$N+1 = edge ids — 1 + 490 = 491 params per chunk).
1293        const MAX_BATCH: usize = 490;
1294        if edge_ids.is_empty() || delta == 0.0 {
1295            return Ok(());
1296        }
1297        for chunk in edge_ids.chunks(MAX_BATCH) {
1298            let edge_placeholders = placeholder_list(2, chunk.len());
1299            let sql = format!(
1300                "UPDATE graph_edges \
1301                 SET weight = weight + $1 \
1302                 WHERE id IN ({edge_placeholders}) \
1303                   AND valid_to IS NULL"
1304            );
1305            let mut q = zeph_db::query(&sql);
1306            q = q.bind(f64::from(delta));
1307            for id in chunk {
1308                q = q.bind(*id);
1309            }
1310            q.execute(&self.pool).await?;
1311        }
1312        Ok(())
1313    }
1314
1315    /// Return the subset of `ids` that exist in `graph_entities`.
1316    ///
1317    /// Useful for cross-referencing Qdrant-side entity IDs against the `SQLite` truth.
1318    /// Processes in chunks of 490 to stay under the `SQLite` variable limit (~32 k).
1319    ///
1320    /// # Errors
1321    ///
1322    /// Returns an error if the database query fails.
1323    pub async fn entity_ids_in(&self, ids: &[i64]) -> Result<Vec<i64>, MemoryError> {
1324        const MAX_BATCH: usize = 490;
1325        if ids.is_empty() {
1326            return Ok(Vec::new());
1327        }
1328        // TODO: chunk when ids.len() > 5_000 to guard against extreme cases
1329        let mut result = Vec::with_capacity(ids.len());
1330        for chunk in ids.chunks(MAX_BATCH) {
1331            let placeholders = zeph_db::placeholder_list(1, chunk.len());
1332            let sql = format!("SELECT id FROM graph_entities WHERE id IN ({placeholders})");
1333            let mut q = zeph_db::query_as::<_, (i64,)>(&sql);
1334            for id in chunk {
1335                q = q.bind(*id);
1336            }
1337            let rows = q.fetch_all(&self.pool).await?;
1338            for (id,) in rows {
1339                result.push(id);
1340            }
1341        }
1342        Ok(result)
1343    }
1344
1345    /// Resolve entity IDs to their Qdrant point IDs in a single batched `SELECT` (HL-F5, #3346).
1346    ///
1347    /// Entities without a `qdrant_point_id` are silently omitted from the result.
1348    ///
1349    /// # Errors
1350    ///
1351    /// Returns an error if the database query fails.
1352    pub async fn qdrant_point_ids_for_entities(
1353        &self,
1354        entity_ids: &[i64],
1355    ) -> Result<HashMap<i64, String>, MemoryError> {
1356        // Chunk to stay under SQLite variable limit.
1357        const MAX_BATCH: usize = 490;
1358        if entity_ids.is_empty() {
1359            return Ok(HashMap::new());
1360        }
1361        let mut result: HashMap<i64, String> = HashMap::with_capacity(entity_ids.len());
1362        for chunk in entity_ids.chunks(MAX_BATCH) {
1363            let placeholders = zeph_db::placeholder_list(1, chunk.len());
1364            let sql = format!(
1365                "SELECT id, qdrant_point_id \
1366                 FROM graph_entities \
1367                 WHERE id IN ({placeholders}) \
1368                   AND qdrant_point_id IS NOT NULL"
1369            );
1370            let mut q = zeph_db::query_as::<_, (i64, String)>(&sql);
1371            for id in chunk {
1372                q = q.bind(*id);
1373            }
1374            let rows = q.fetch_all(&self.pool).await?;
1375            for (entity_id, point_id) in rows {
1376                result.insert(entity_id, point_id);
1377            }
1378        }
1379        Ok(result)
1380    }
1381
1382    /// Apply multiplicative decay to `retrieval_count` for un-retrieved active edges.
1383    ///
1384    /// Only edges with `retrieval_count > 0` and `last_retrieved_at < (now - interval_secs)`
1385    /// are updated. Returns the number of rows affected.
1386    ///
1387    /// # Errors
1388    ///
1389    /// Returns an error if the database query fails.
1390    pub async fn decay_edge_retrieval_counts(
1391        &self,
1392        decay_lambda: f64,
1393        interval_secs: u64,
1394    ) -> Result<usize, MemoryError> {
1395        let epoch_now_decay = <ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1396        let decay_raw = format!(
1397            "UPDATE graph_edges \
1398             SET retrieval_count = MAX(CAST(retrieval_count * ? AS INTEGER), 0) \
1399             WHERE valid_to IS NULL \
1400               AND retrieval_count > 0 \
1401               AND (last_retrieved_at IS NULL OR last_retrieved_at < {epoch_now_decay} - ?)"
1402        );
1403        let decay_sql = zeph_db::rewrite_placeholders(&decay_raw);
1404        let result = zeph_db::query(&decay_sql)
1405            .bind(decay_lambda)
1406            .bind(i64::try_from(interval_secs).unwrap_or(i64::MAX))
1407            .execute(&self.pool)
1408            .await?;
1409        Ok(usize::try_from(result.rows_affected())?)
1410    }
1411
1412    /// Delete expired edges older than `retention_days` and return count deleted.
1413    ///
1414    /// # Errors
1415    ///
1416    /// Returns an error if the database query fails.
1417    pub async fn delete_expired_edges(&self, retention_days: u32) -> Result<usize, MemoryError> {
1418        let days = i64::from(retention_days);
1419        let result = zeph_db::query(sql!(
1420            "DELETE FROM graph_edges
1421             WHERE expired_at IS NOT NULL
1422               AND expired_at < datetime('now', '-' || ? || ' days')"
1423        ))
1424        .bind(days)
1425        .execute(&self.pool)
1426        .await?;
1427        Ok(usize::try_from(result.rows_affected())?)
1428    }
1429
1430    /// Delete orphan entities (no active edges, last seen more than `retention_days` ago).
1431    ///
1432    /// # Errors
1433    ///
1434    /// Returns an error if the database query fails.
1435    pub async fn delete_orphan_entities(&self, retention_days: u32) -> Result<usize, MemoryError> {
1436        let days = i64::from(retention_days);
1437        let result = zeph_db::query(sql!(
1438            "DELETE FROM graph_entities
1439             WHERE id NOT IN (
1440                 SELECT DISTINCT source_entity_id FROM graph_edges WHERE valid_to IS NULL
1441                 UNION
1442                 SELECT DISTINCT target_entity_id FROM graph_edges WHERE valid_to IS NULL
1443             )
1444             AND last_seen_at < datetime('now', '-' || ? || ' days')"
1445        ))
1446        .bind(days)
1447        .execute(&self.pool)
1448        .await?;
1449        Ok(usize::try_from(result.rows_affected())?)
1450    }
1451
1452    /// Delete the oldest excess entities when count exceeds `max_entities`.
1453    ///
1454    /// Entities are ranked by ascending edge count, then ascending `last_seen_at` (LRU).
1455    /// Only deletes when `entity_count() > max_entities`.
1456    ///
1457    /// # Errors
1458    ///
1459    /// Returns an error if the database query fails.
1460    pub async fn cap_entities(&self, max_entities: usize) -> Result<usize, MemoryError> {
1461        let current = self.entity_count().await?;
1462        let max = i64::try_from(max_entities)?;
1463        if current <= max {
1464            return Ok(0);
1465        }
1466        let excess = current - max;
1467        let result = zeph_db::query(sql!(
1468            "DELETE FROM graph_entities
1469             WHERE id IN (
1470                 SELECT e.id
1471                 FROM graph_entities e
1472                 LEFT JOIN (
1473                     SELECT source_entity_id AS eid, COUNT(*) AS cnt
1474                     FROM graph_edges WHERE valid_to IS NULL GROUP BY source_entity_id
1475                     UNION ALL
1476                     SELECT target_entity_id AS eid, COUNT(*) AS cnt
1477                     FROM graph_edges WHERE valid_to IS NULL GROUP BY target_entity_id
1478                 ) edge_counts ON e.id = edge_counts.eid
1479                 ORDER BY COALESCE(edge_counts.cnt, 0) ASC, e.last_seen_at ASC
1480                 LIMIT ?
1481             )"
1482        ))
1483        .bind(excess)
1484        .execute(&self.pool)
1485        .await?;
1486        Ok(usize::try_from(result.rows_affected())?)
1487    }
1488
1489    // ── Temporal Edge Queries ─────────────────────────────────────────────────
1490
1491    /// Return all edges for `entity_id` (as source or target) that were valid at `timestamp`.
1492    ///
1493    /// An edge is valid at `timestamp` when:
1494    /// - `valid_from <= timestamp`, AND
1495    /// - `valid_to IS NULL` (open-ended) OR `valid_to > timestamp`.
1496    ///
1497    /// `timestamp` must be a `SQLite` datetime string: `"YYYY-MM-DD HH:MM:SS"`.
1498    ///
1499    /// # Errors
1500    ///
1501    /// Returns an error if the database query fails.
1502    pub async fn edges_at_timestamp(
1503        &self,
1504        entity_id: i64,
1505        timestamp: &str,
1506    ) -> Result<Vec<Edge>, MemoryError> {
1507        // Split into two UNIONed branches to leverage the partial indexes from migration 030:
1508        //   Branch 1 (active edges):     idx_graph_edges_valid + idx_graph_edges_source/target
1509        //   Branch 2 (historical edges): idx_graph_edges_src_temporal / idx_graph_edges_tgt_temporal
1510        let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
1511            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1512                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1513                    edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
1514             FROM graph_edges
1515             WHERE valid_to IS NULL
1516               AND valid_from <= ?
1517               AND (source_entity_id = ? OR target_entity_id = ?)
1518             UNION ALL
1519             SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1520                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1521                    edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
1522             FROM graph_edges
1523             WHERE valid_to IS NOT NULL
1524               AND valid_from <= ?
1525               AND valid_to > ?
1526               AND (source_entity_id = ? OR target_entity_id = ?)"
1527        ))
1528        .bind(timestamp)
1529        .bind(entity_id)
1530        .bind(entity_id)
1531        .bind(timestamp)
1532        .bind(timestamp)
1533        .bind(entity_id)
1534        .bind(entity_id)
1535        .fetch_all(&self.pool)
1536        .await?;
1537        Ok(rows.into_iter().map(edge_from_row).collect())
1538    }
1539
1540    /// Return all edge versions (active and expired) for the given `(source, predicate)` pair.
1541    ///
1542    /// The optional `relation` filter restricts results to a specific relation label.
1543    /// Results are ordered by `valid_from DESC` (most recent first).
1544    ///
1545    /// # Errors
1546    ///
1547    /// Returns an error if the database query fails.
1548    pub async fn edge_history(
1549        &self,
1550        source_entity_id: i64,
1551        predicate: &str,
1552        relation: Option<&str>,
1553        limit: usize,
1554    ) -> Result<Vec<Edge>, MemoryError> {
1555        // Escape LIKE wildcards so `%` and `_` in the predicate are treated as literals.
1556        let escaped = predicate
1557            .replace('\\', "\\\\")
1558            .replace('%', "\\%")
1559            .replace('_', "\\_");
1560        let like_pattern = format!("%{escaped}%");
1561        let limit = i64::try_from(limit)?;
1562        let rows: Vec<EdgeRow> = if let Some(rel) = relation {
1563            zeph_db::query_as(sql!(
1564                "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1565                        valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1566                        edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
1567                 FROM graph_edges
1568                 WHERE source_entity_id = ?
1569                   AND fact LIKE ? ESCAPE '\\'
1570                   AND relation = ?
1571                 ORDER BY valid_from DESC
1572                 LIMIT ?"
1573            ))
1574            .bind(source_entity_id)
1575            .bind(&like_pattern)
1576            .bind(rel)
1577            .bind(limit)
1578            .fetch_all(&self.pool)
1579            .await?
1580        } else {
1581            zeph_db::query_as(sql!(
1582                "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1583                        valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1584                        edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
1585                 FROM graph_edges
1586                 WHERE source_entity_id = ?
1587                   AND fact LIKE ? ESCAPE '\\'
1588                 ORDER BY valid_from DESC
1589                 LIMIT ?"
1590            ))
1591            .bind(source_entity_id)
1592            .bind(&like_pattern)
1593            .bind(limit)
1594            .fetch_all(&self.pool)
1595            .await?
1596        };
1597        Ok(rows.into_iter().map(edge_from_row).collect())
1598    }
1599
1600    // ── BFS Traversal ─────────────────────────────────────────────────────────
1601
1602    /// Breadth-first traversal from `start_entity_id` up to `max_hops` hops.
1603    ///
1604    /// Returns all reachable entities and the active edges connecting them.
1605    /// Implements BFS iteratively in Rust to guarantee cycle safety regardless
1606    /// of `SQLite` CTE limitations.
1607    ///
1608    /// **`SQLite` bind parameter limit**: each BFS hop binds the frontier IDs three times in the
1609    /// neighbour query. At ~300+ frontier entities per hop, the IN clause may approach `SQLite`'s
1610    /// default `SQLITE_MAX_VARIABLE_NUMBER` limit of 999. Acceptable for Phase 1 (small graphs,
1611    /// `max_hops` typically 2–3). For large graphs, consider batching or a temp-table approach.
1612    ///
1613    /// # Errors
1614    ///
1615    /// Returns an error if any database query fails.
1616    pub async fn bfs(
1617        &self,
1618        start_entity_id: i64,
1619        max_hops: u32,
1620    ) -> Result<(Vec<Entity>, Vec<Edge>), MemoryError> {
1621        self.bfs_with_depth(start_entity_id, max_hops)
1622            .await
1623            .map(|(e, ed, _)| (e, ed))
1624    }
1625
1626    /// BFS traversal returning entities, edges, and a depth map (`entity_id` → hop distance).
1627    ///
1628    /// The depth map records the minimum hop distance from `start_entity_id` to each visited
1629    /// entity. The start entity itself has depth 0.
1630    ///
1631    /// **`SQLite` bind parameter limit**: see [`Self::bfs`] for notes on frontier size limits.
1632    ///
1633    /// # Errors
1634    ///
1635    /// Returns an error if any database query fails.
1636    pub async fn bfs_with_depth(
1637        &self,
1638        start_entity_id: i64,
1639        max_hops: u32,
1640    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1641        self.bfs_core(start_entity_id, max_hops, None).await
1642    }
1643
1644    /// BFS traversal considering only edges that were valid at `timestamp`.
1645    ///
1646    /// Equivalent to [`Self::bfs_with_depth`] but replaces the `valid_to IS NULL` filter with
1647    /// the temporal range predicate `valid_from <= ts AND (valid_to IS NULL OR valid_to > ts)`.
1648    ///
1649    /// `timestamp` must be a `SQLite` datetime string: `"YYYY-MM-DD HH:MM:SS"`.
1650    ///
1651    /// # Errors
1652    ///
1653    /// Returns an error if any database query fails.
1654    pub async fn bfs_at_timestamp(
1655        &self,
1656        start_entity_id: i64,
1657        max_hops: u32,
1658        timestamp: &str,
1659    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1660        self.bfs_core(start_entity_id, max_hops, Some(timestamp))
1661            .await
1662    }
1663
1664    /// BFS traversal scoped to specific MAGMA edge types.
1665    ///
1666    /// When `edge_types` is empty, behaves identically to [`Self::bfs_with_depth`] (traverses all
1667    /// active edges). When `edge_types` is non-empty, only traverses edges whose `edge_type`
1668    /// matches one of the provided types.
1669    ///
1670    /// This enables subgraph-scoped retrieval: a causal query traverses only causal + semantic
1671    /// edges, a temporal query only temporal + semantic edges, etc.
1672    ///
1673    /// Note: Semantic is typically included in `edge_types` by the caller to ensure recall is
1674    /// never worse than the untyped BFS. See `classify_graph_subgraph` in `router.rs`.
1675    ///
1676    /// # Errors
1677    ///
1678    /// Returns an error if any database query fails.
1679    pub async fn bfs_typed(
1680        &self,
1681        start_entity_id: i64,
1682        max_hops: u32,
1683        edge_types: &[EdgeType],
1684    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1685        if edge_types.is_empty() {
1686            return self.bfs_with_depth(start_entity_id, max_hops).await;
1687        }
1688        self.bfs_core_typed(start_entity_id, max_hops, None, edge_types)
1689            .await
1690    }
1691
1692    /// Shared BFS implementation.
1693    ///
1694    /// When `at_timestamp` is `None`, only active edges (`valid_to IS NULL`) are traversed.
1695    /// When `at_timestamp` is `Some(ts)`, edges valid at `ts` are traversed (temporal BFS).
1696    ///
1697    /// All IDs used in dynamic SQL come from our own database — no user input reaches the
1698    /// format string, so there is no SQL injection risk.
1699    async fn bfs_core(
1700        &self,
1701        start_entity_id: i64,
1702        max_hops: u32,
1703        at_timestamp: Option<&str>,
1704    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1705        use std::collections::HashMap;
1706
1707        // SQLite binds frontier IDs 3× per hop; at >333 IDs the IN clause exceeds
1708        // SQLITE_MAX_VARIABLE_NUMBER (999). Cap to 300 to stay safely within the limit.
1709        const MAX_FRONTIER: usize = 300;
1710
1711        let mut depth_map: HashMap<i64, u32> = HashMap::new();
1712        let mut frontier: Vec<i64> = vec![start_entity_id];
1713        depth_map.insert(start_entity_id, 0);
1714
1715        for hop in 0..max_hops {
1716            if frontier.is_empty() {
1717                break;
1718            }
1719            frontier.truncate(MAX_FRONTIER);
1720            // IDs come from our own DB — no user input, no injection risk.
1721            // Three copies of frontier IDs: positions 1..n, n+1..2n, 2n+1..3n.
1722            // Timestamp (if any) follows at position 3n+1.
1723            let n = frontier.len();
1724            let ph1 = placeholder_list(1, n);
1725            let ph2 = placeholder_list(n + 1, n);
1726            let ph3 = placeholder_list(n * 2 + 1, n);
1727            let edge_filter = if at_timestamp.is_some() {
1728                let ts_pos = n * 3 + 1;
1729                format!(
1730                    "valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1731                    ts = numbered_placeholder(ts_pos),
1732                )
1733            } else {
1734                "valid_to IS NULL".to_owned()
1735            };
1736            let neighbour_sql = format!(
1737                "SELECT DISTINCT CASE
1738                     WHEN source_entity_id IN ({ph1}) THEN target_entity_id
1739                     ELSE source_entity_id
1740                 END as neighbour_id
1741                 FROM graph_edges
1742                 WHERE {edge_filter}
1743                   AND (source_entity_id IN ({ph2}) OR target_entity_id IN ({ph3}))"
1744            );
1745            let mut q = zeph_db::query_scalar::<_, i64>(&neighbour_sql);
1746            for id in &frontier {
1747                q = q.bind(*id);
1748            }
1749            for id in &frontier {
1750                q = q.bind(*id);
1751            }
1752            for id in &frontier {
1753                q = q.bind(*id);
1754            }
1755            if let Some(ts) = at_timestamp {
1756                q = q.bind(ts);
1757            }
1758            let neighbours: Vec<i64> = q.fetch_all(&self.pool).await?;
1759            let mut next_frontier: Vec<i64> = Vec::new();
1760            for nbr in neighbours {
1761                if let std::collections::hash_map::Entry::Vacant(e) = depth_map.entry(nbr) {
1762                    e.insert(hop + 1);
1763                    next_frontier.push(nbr);
1764                }
1765            }
1766            frontier = next_frontier;
1767        }
1768
1769        self.bfs_fetch_results(depth_map, at_timestamp).await
1770    }
1771
1772    /// BFS implementation scoped to specific edge types.
1773    ///
1774    /// Builds the IN clause for `edge_type` filtering dynamically from enum values.
1775    /// All enum-derived strings come from `EdgeType::as_str()` — no user input reaches SQL.
1776    ///
1777    /// # Errors
1778    ///
1779    /// Returns an error if any database query fails.
1780    async fn bfs_core_typed(
1781        &self,
1782        start_entity_id: i64,
1783        max_hops: u32,
1784        at_timestamp: Option<&str>,
1785        edge_types: &[EdgeType],
1786    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1787        use std::collections::HashMap;
1788
1789        const MAX_FRONTIER: usize = 300;
1790
1791        let type_strs: Vec<&str> = edge_types.iter().map(|t| t.as_str()).collect();
1792
1793        let mut depth_map: HashMap<i64, u32> = HashMap::new();
1794        let mut frontier: Vec<i64> = vec![start_entity_id];
1795        depth_map.insert(start_entity_id, 0);
1796
1797        let n_types = type_strs.len();
1798        // type_in is constant for the entire BFS — positions 1..=n_types never change.
1799        let type_in = placeholder_list(1, n_types);
1800        let id_start = n_types + 1;
1801
1802        for hop in 0..max_hops {
1803            if frontier.is_empty() {
1804                break;
1805            }
1806            frontier.truncate(MAX_FRONTIER);
1807
1808            let n_frontier = frontier.len();
1809            // Positions: types first (1..n_types), then 3 copies of frontier IDs.
1810            let fp1 = placeholder_list(id_start, n_frontier);
1811            let fp2 = placeholder_list(id_start + n_frontier, n_frontier);
1812            let fp3 = placeholder_list(id_start + n_frontier * 2, n_frontier);
1813
1814            let edge_filter = if at_timestamp.is_some() {
1815                let ts_pos = id_start + n_frontier * 3;
1816                format!(
1817                    "edge_type IN ({type_in}) AND valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1818                    ts = numbered_placeholder(ts_pos),
1819                )
1820            } else {
1821                format!("edge_type IN ({type_in}) AND valid_to IS NULL")
1822            };
1823
1824            let neighbour_sql = format!(
1825                "SELECT DISTINCT CASE
1826                     WHEN source_entity_id IN ({fp1}) THEN target_entity_id
1827                     ELSE source_entity_id
1828                 END as neighbour_id
1829                 FROM graph_edges
1830                 WHERE {edge_filter}
1831                   AND (source_entity_id IN ({fp2}) OR target_entity_id IN ({fp3}))"
1832            );
1833
1834            let mut q = zeph_db::query_scalar::<_, i64>(&neighbour_sql);
1835            // Bind types first
1836            for t in &type_strs {
1837                q = q.bind(*t);
1838            }
1839            // Bind frontier 3 times
1840            for id in &frontier {
1841                q = q.bind(*id);
1842            }
1843            for id in &frontier {
1844                q = q.bind(*id);
1845            }
1846            for id in &frontier {
1847                q = q.bind(*id);
1848            }
1849            if let Some(ts) = at_timestamp {
1850                q = q.bind(ts);
1851            }
1852
1853            let neighbours: Vec<i64> = q.fetch_all(&self.pool).await?;
1854            let mut next_frontier: Vec<i64> = Vec::new();
1855            for nbr in neighbours {
1856                if let std::collections::hash_map::Entry::Vacant(e) = depth_map.entry(nbr) {
1857                    e.insert(hop + 1);
1858                    next_frontier.push(nbr);
1859                }
1860            }
1861            frontier = next_frontier;
1862        }
1863
1864        // Fetch results — pass edge_type filter to bfs_fetch_results_typed
1865        self.bfs_fetch_results_typed(depth_map, at_timestamp, &type_strs)
1866            .await
1867    }
1868
1869    /// Fetch entities and typed edges for a completed BFS depth map.
1870    ///
1871    /// Filters returned edges by the provided `edge_type` strings.
1872    ///
1873    /// # Errors
1874    ///
1875    /// Returns an error if any database query fails.
1876    async fn bfs_fetch_results_typed(
1877        &self,
1878        depth_map: std::collections::HashMap<i64, u32>,
1879        at_timestamp: Option<&str>,
1880        type_strs: &[&str],
1881    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1882        let mut visited_ids: Vec<i64> = depth_map.keys().copied().collect();
1883        if visited_ids.is_empty() {
1884            return Ok((Vec::new(), Vec::new(), depth_map));
1885        }
1886        if visited_ids.len() > 499 {
1887            tracing::warn!(
1888                total = visited_ids.len(),
1889                retained = 499,
1890                "bfs_fetch_results_typed: visited entity set truncated to 499"
1891            );
1892            visited_ids.truncate(499);
1893        }
1894
1895        let n_types = type_strs.len();
1896        let n_visited = visited_ids.len();
1897
1898        // Bind order: types first (1..=n_types), then visited_ids twice, then optional timestamp.
1899        let type_in = placeholder_list(1, n_types);
1900        let id_start = n_types + 1;
1901        let ph_ids1 = placeholder_list(id_start, n_visited);
1902        let ph_ids2 = placeholder_list(id_start + n_visited, n_visited);
1903
1904        let edge_filter = if at_timestamp.is_some() {
1905            let ts_pos = id_start + n_visited * 2;
1906            format!(
1907                "edge_type IN ({type_in}) AND valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1908                ts = numbered_placeholder(ts_pos),
1909            )
1910        } else {
1911            format!("edge_type IN ({type_in}) AND valid_to IS NULL")
1912        };
1913
1914        let edge_sql = format!(
1915            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1916                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1917                    edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
1918             FROM graph_edges
1919             WHERE {edge_filter}
1920               AND source_entity_id IN ({ph_ids1})
1921               AND target_entity_id IN ({ph_ids2})"
1922        );
1923        let mut edge_query = zeph_db::query_as::<_, EdgeRow>(&edge_sql);
1924        for t in type_strs {
1925            edge_query = edge_query.bind(*t);
1926        }
1927        for id in &visited_ids {
1928            edge_query = edge_query.bind(*id);
1929        }
1930        for id in &visited_ids {
1931            edge_query = edge_query.bind(*id);
1932        }
1933        if let Some(ts) = at_timestamp {
1934            edge_query = edge_query.bind(ts);
1935        }
1936        let edge_rows: Vec<EdgeRow> = edge_query.fetch_all(&self.pool).await?;
1937
1938        // For entity query, use plain sequential bind positions (no type prefix offset)
1939        let entity_sql2 = format!(
1940            "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
1941             FROM graph_entities WHERE id IN ({ph})",
1942            ph = placeholder_list(1, visited_ids.len()),
1943        );
1944        let mut entity_query = zeph_db::query_as::<_, EntityRow>(&entity_sql2);
1945        for id in &visited_ids {
1946            entity_query = entity_query.bind(*id);
1947        }
1948        let entity_rows: Vec<EntityRow> = entity_query.fetch_all(&self.pool).await?;
1949
1950        let entities: Vec<Entity> = entity_rows
1951            .into_iter()
1952            .map(entity_from_row)
1953            .collect::<Result<Vec<_>, _>>()?;
1954        let edges: Vec<Edge> = edge_rows.into_iter().map(edge_from_row).collect();
1955
1956        Ok((entities, edges, depth_map))
1957    }
1958
1959    /// Fetch entities and edges for a completed BFS depth map.
1960    async fn bfs_fetch_results(
1961        &self,
1962        depth_map: std::collections::HashMap<i64, u32>,
1963        at_timestamp: Option<&str>,
1964    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1965        let mut visited_ids: Vec<i64> = depth_map.keys().copied().collect();
1966        if visited_ids.is_empty() {
1967            return Ok((Vec::new(), Vec::new(), depth_map));
1968        }
1969        // Edge query binds visited_ids twice — cap at 499 to stay under SQLite 999 limit.
1970        if visited_ids.len() > 499 {
1971            tracing::warn!(
1972                total = visited_ids.len(),
1973                retained = 499,
1974                "bfs_fetch_results: visited entity set truncated to 499 to stay within SQLite bind limit; \
1975                 some reachable entities will be dropped from results"
1976            );
1977            visited_ids.truncate(499);
1978        }
1979
1980        let n = visited_ids.len();
1981        let ph_ids1 = placeholder_list(1, n);
1982        let ph_ids2 = placeholder_list(n + 1, n);
1983        let edge_filter = if at_timestamp.is_some() {
1984            let ts_pos = n * 2 + 1;
1985            format!(
1986                "valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1987                ts = numbered_placeholder(ts_pos),
1988            )
1989        } else {
1990            "valid_to IS NULL".to_owned()
1991        };
1992        let edge_sql = format!(
1993            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1994                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1995                    edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
1996             FROM graph_edges
1997             WHERE {edge_filter}
1998               AND source_entity_id IN ({ph_ids1})
1999               AND target_entity_id IN ({ph_ids2})"
2000        );
2001        let mut edge_query = zeph_db::query_as::<_, EdgeRow>(&edge_sql);
2002        for id in &visited_ids {
2003            edge_query = edge_query.bind(*id);
2004        }
2005        for id in &visited_ids {
2006            edge_query = edge_query.bind(*id);
2007        }
2008        if let Some(ts) = at_timestamp {
2009            edge_query = edge_query.bind(ts);
2010        }
2011        let edge_rows: Vec<EdgeRow> = edge_query.fetch_all(&self.pool).await?;
2012
2013        let entity_sql = format!(
2014            "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
2015             FROM graph_entities WHERE id IN ({ph})",
2016            ph = placeholder_list(1, n),
2017        );
2018        let mut entity_query = zeph_db::query_as::<_, EntityRow>(&entity_sql);
2019        for id in &visited_ids {
2020            entity_query = entity_query.bind(*id);
2021        }
2022        let entity_rows: Vec<EntityRow> = entity_query.fetch_all(&self.pool).await?;
2023
2024        let entities: Vec<Entity> = entity_rows
2025            .into_iter()
2026            .map(entity_from_row)
2027            .collect::<Result<Vec<_>, _>>()?;
2028        let edges: Vec<Edge> = edge_rows.into_iter().map(edge_from_row).collect();
2029
2030        Ok((entities, edges, depth_map))
2031    }
2032
2033    // ── Backfill helpers ──────────────────────────────────────────────────────
2034
2035    /// Find an entity by name only (no type filter).
2036    ///
2037    /// Uses a two-phase lookup to ensure exact name matches are always prioritised:
2038    /// 1. Exact case-insensitive match on `name` or `canonical_name`.
2039    /// 2. If no exact match found, falls back to FTS5 prefix search (see `find_entities_fuzzy`).
2040    ///
2041    /// This prevents FTS5 from returning a different entity whose *summary* mentions the
2042    /// searched name (e.g. searching "Alice" returning "Google" because Google's summary
2043    /// contains "Alice").
2044    ///
2045    /// # Errors
2046    ///
2047    /// Returns an error if the database query fails.
2048    pub async fn find_entity_by_name(&self, name: &str) -> Result<Vec<Entity>, MemoryError> {
2049        let find_by_name_sql = format!(
2050            "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id \
2051             FROM graph_entities \
2052             WHERE name = ? {cn} OR canonical_name = ? {cn} \
2053             LIMIT 5",
2054            cn = <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
2055        );
2056        let rows: Vec<EntityRow> = zeph_db::query_as(&find_by_name_sql)
2057            .bind(name)
2058            .bind(name)
2059            .fetch_all(&self.pool)
2060            .await?;
2061
2062        if !rows.is_empty() {
2063            return rows.into_iter().map(entity_from_row).collect();
2064        }
2065
2066        self.find_entities_fuzzy(name, 5).await
2067    }
2068
2069    /// Return up to `limit` messages that have not yet been processed by graph extraction.
2070    ///
2071    /// Reads the `graph_processed` column added by migration 021.
2072    ///
2073    /// # Errors
2074    ///
2075    /// Returns an error if the database query fails.
2076    pub async fn unprocessed_messages_for_backfill(
2077        &self,
2078        limit: usize,
2079    ) -> Result<Vec<(crate::types::MessageId, String)>, MemoryError> {
2080        let limit = i64::try_from(limit)?;
2081        let rows: Vec<(i64, String)> = zeph_db::query_as(sql!(
2082            "SELECT id, content FROM messages
2083             WHERE graph_processed = 0
2084             ORDER BY id ASC
2085             LIMIT ?"
2086        ))
2087        .bind(limit)
2088        .fetch_all(&self.pool)
2089        .await?;
2090        Ok(rows
2091            .into_iter()
2092            .map(|(id, content)| (crate::types::MessageId(id), content))
2093            .collect())
2094    }
2095
2096    /// Return the count of messages not yet processed by graph extraction.
2097    ///
2098    /// # Errors
2099    ///
2100    /// Returns an error if the database query fails.
2101    pub async fn unprocessed_message_count(&self) -> Result<i64, MemoryError> {
2102        let count: i64 = zeph_db::query_scalar(sql!(
2103            "SELECT COUNT(*) FROM messages WHERE graph_processed = 0"
2104        ))
2105        .fetch_one(&self.pool)
2106        .await?;
2107        Ok(count)
2108    }
2109
2110    /// Mark a batch of messages as graph-processed.
2111    ///
2112    /// # Errors
2113    ///
2114    /// Returns an error if the database query fails.
2115    pub async fn mark_messages_graph_processed(
2116        &self,
2117        ids: &[crate::types::MessageId],
2118    ) -> Result<(), MemoryError> {
2119        const MAX_BATCH: usize = 490;
2120        if ids.is_empty() {
2121            return Ok(());
2122        }
2123        for chunk in ids.chunks(MAX_BATCH) {
2124            let placeholders = placeholder_list(1, chunk.len());
2125            let sql =
2126                format!("UPDATE messages SET graph_processed = 1 WHERE id IN ({placeholders})");
2127            let mut query = zeph_db::query(&sql);
2128            for id in chunk {
2129                query = query.bind(id.0);
2130            }
2131            query.execute(&self.pool).await?;
2132        }
2133        Ok(())
2134    }
2135}
2136
2137// ── Dialect helpers ───────────────────────────────────────────────────────────
2138
2139#[cfg(all(feature = "sqlite", not(feature = "postgres")))]
2140fn community_ids_sql(placeholders: &str) -> String {
2141    format!(
2142        "SELECT CAST(j.value AS INTEGER) AS entity_id, c.id AS community_id
2143         FROM graph_communities c, json_each(c.entity_ids) j
2144         WHERE CAST(j.value AS INTEGER) IN ({placeholders})"
2145    )
2146}
2147
2148#[cfg(feature = "postgres")]
2149fn community_ids_sql(placeholders: &str) -> String {
2150    format!(
2151        "SELECT (j.value)::bigint AS entity_id, c.id AS community_id
2152         FROM graph_communities c,
2153              jsonb_array_elements_text(c.entity_ids::jsonb) j(value)
2154         WHERE (j.value)::bigint IN ({placeholders})"
2155    )
2156}
2157
2158// ── Row types for zeph_db::query_as ─────────────────────────────────────────────
2159
2160#[derive(zeph_db::FromRow)]
2161struct EntityRow {
2162    id: i64,
2163    name: String,
2164    canonical_name: String,
2165    entity_type: String,
2166    summary: Option<String>,
2167    first_seen_at: String,
2168    last_seen_at: String,
2169    qdrant_point_id: Option<String>,
2170}
2171
2172fn entity_from_row(row: EntityRow) -> Result<Entity, MemoryError> {
2173    let entity_type = row
2174        .entity_type
2175        .parse::<EntityType>()
2176        .map_err(MemoryError::GraphStore)?;
2177    Ok(Entity {
2178        id: EntityId(row.id),
2179        name: row.name,
2180        canonical_name: row.canonical_name,
2181        entity_type,
2182        summary: row.summary,
2183        first_seen_at: row.first_seen_at,
2184        last_seen_at: row.last_seen_at,
2185        qdrant_point_id: row.qdrant_point_id,
2186    })
2187}
2188
2189#[derive(zeph_db::FromRow)]
2190struct AliasRow {
2191    id: i64,
2192    entity_id: i64,
2193    alias_name: String,
2194    created_at: String,
2195}
2196
2197fn alias_from_row(row: AliasRow) -> EntityAlias {
2198    EntityAlias {
2199        id: row.id,
2200        entity_id: EntityId(row.entity_id),
2201        alias_name: row.alias_name,
2202        created_at: row.created_at,
2203    }
2204}
2205
2206#[derive(zeph_db::FromRow)]
2207struct EdgeRow {
2208    id: i64,
2209    source_entity_id: i64,
2210    target_entity_id: i64,
2211    relation: String,
2212    fact: String,
2213    confidence: f64,
2214    valid_from: String,
2215    valid_to: Option<String>,
2216    created_at: String,
2217    expired_at: Option<String>,
2218    #[sqlx(rename = "episode_id")]
2219    source_message_id: Option<i64>,
2220    qdrant_point_id: Option<String>,
2221    edge_type: String,
2222    retrieval_count: i32,
2223    last_retrieved_at: Option<i64>,
2224    superseded_by: Option<i64>,
2225    canonical_relation: Option<String>,
2226    supersedes: Option<i64>,
2227    // Hebbian reinforcement weight (HL-F1, #3344). SQLite REAL maps to f64 via sqlx.
2228    weight: f64,
2229}
2230
2231fn edge_from_row(row: EdgeRow) -> Edge {
2232    let edge_type = row
2233        .edge_type
2234        .parse::<EdgeType>()
2235        .unwrap_or(EdgeType::Semantic);
2236    let canonical_relation = row
2237        .canonical_relation
2238        .unwrap_or_else(|| row.relation.clone());
2239    Edge {
2240        id: row.id,
2241        source_entity_id: row.source_entity_id,
2242        target_entity_id: row.target_entity_id,
2243        canonical_relation,
2244        relation: row.relation,
2245        fact: row.fact,
2246        #[allow(clippy::cast_possible_truncation)]
2247        confidence: row.confidence as f32,
2248        valid_from: row.valid_from,
2249        valid_to: row.valid_to,
2250        created_at: row.created_at,
2251        expired_at: row.expired_at,
2252        source_message_id: row.source_message_id.map(MessageId),
2253        qdrant_point_id: row.qdrant_point_id,
2254        edge_type,
2255        retrieval_count: row.retrieval_count,
2256        last_retrieved_at: row.last_retrieved_at,
2257        superseded_by: row.superseded_by,
2258        supersedes: row.supersedes,
2259        #[allow(clippy::cast_possible_truncation)]
2260        weight: row.weight as f32,
2261    }
2262}
2263
2264#[derive(zeph_db::FromRow)]
2265struct CommunityRow {
2266    id: i64,
2267    name: String,
2268    summary: String,
2269    entity_ids: String,
2270    fingerprint: Option<String>,
2271    created_at: String,
2272    updated_at: String,
2273}
2274
2275// ── GAAMA Episode methods ──────────────────────────────────────────────────────
2276
2277impl GraphStore {
2278    /// Ensure a GAAMA episode exists for this conversation, returning its ID.
2279    ///
2280    /// Idempotent: inserts on first call, returns existing ID on subsequent calls.
2281    ///
2282    /// # Errors
2283    ///
2284    /// Returns an error if the database query fails.
2285    pub async fn ensure_episode(&self, conversation_id: i64) -> Result<i64, MemoryError> {
2286        // Ensure the conversation row exists before inserting into graph_episodes,
2287        // which has a FK referencing conversations(id). On a fresh database the agent
2288        // may run graph extraction before the conversation row is committed.
2289        zeph_db::query(sql!(
2290            "INSERT INTO conversations (id) VALUES (?)
2291             ON CONFLICT (id) DO NOTHING"
2292        ))
2293        .bind(conversation_id)
2294        .execute(&self.pool)
2295        .await?;
2296
2297        let id: i64 = zeph_db::query_scalar(sql!(
2298            "INSERT INTO graph_episodes (conversation_id)
2299             VALUES (?)
2300             ON CONFLICT(conversation_id) DO UPDATE SET conversation_id = excluded.conversation_id
2301             RETURNING id"
2302        ))
2303        .bind(conversation_id)
2304        .fetch_one(&self.pool)
2305        .await?;
2306        Ok(id)
2307    }
2308
2309    /// Record that an entity was observed in an episode.
2310    ///
2311    /// Idempotent: does nothing if the link already exists.
2312    ///
2313    /// # Errors
2314    ///
2315    /// Returns an error if the database query fails.
2316    pub async fn link_entity_to_episode(
2317        &self,
2318        episode_id: i64,
2319        entity_id: i64,
2320    ) -> Result<(), MemoryError> {
2321        zeph_db::query(sql!(
2322            "INSERT INTO graph_episode_entities (episode_id, entity_id)
2323             VALUES (?, ?)
2324             ON CONFLICT (episode_id, entity_id) DO NOTHING"
2325        ))
2326        .bind(episode_id)
2327        .bind(entity_id)
2328        .execute(&self.pool)
2329        .await?;
2330        Ok(())
2331    }
2332
2333    /// Return all episodes in which an entity appears.
2334    ///
2335    /// # Errors
2336    ///
2337    /// Returns an error if the database query fails.
2338    pub async fn episodes_for_entity(
2339        &self,
2340        entity_id: i64,
2341    ) -> Result<Vec<super::types::Episode>, MemoryError> {
2342        #[derive(zeph_db::FromRow)]
2343        struct EpisodeRow {
2344            id: i64,
2345            conversation_id: i64,
2346            created_at: String,
2347            closed_at: Option<String>,
2348        }
2349        let rows: Vec<EpisodeRow> = zeph_db::query_as(sql!(
2350            "SELECT e.id, e.conversation_id, e.created_at, e.closed_at
2351             FROM graph_episodes e
2352             JOIN graph_episode_entities ee ON ee.episode_id = e.id
2353             WHERE ee.entity_id = ?"
2354        ))
2355        .bind(entity_id)
2356        .fetch_all(&self.pool)
2357        .await?;
2358        Ok(rows
2359            .into_iter()
2360            .map(|r| super::types::Episode {
2361                id: r.id,
2362                conversation_id: r.conversation_id,
2363                created_at: r.created_at,
2364                closed_at: r.closed_at,
2365            })
2366            .collect())
2367    }
2368
2369    /// Insert a new edge using APEX-MEM append-only supersede semantics (FR-001).
2370    ///
2371    /// If a byte-identical active edge exists for `(src, tgt, canonical_relation, edge_type)`,
2372    /// records a reassertion in `edge_reassertions` and returns the existing edge id (FR-015).
2373    ///
2374    /// If a different active edge exists for the same `(src, canonical_relation, edge_type)` key,
2375    /// invalidates it with a supersession pointer, inserts the new edge, and sets `supersedes` on
2376    /// the new row to link the chain. Checks that the resulting chain depth would not exceed
2377    /// [`SUPERSEDE_DEPTH_CAP`] and that no cycle would be introduced before committing.
2378    ///
2379    /// Optionally increments [`ApexMetrics::supersedes_total`] when `metrics` is provided and a
2380    /// supersession actually occurs.
2381    ///
2382    /// # Errors
2383    ///
2384    /// - [`MemoryError::SupersedeCycle`] — the new edge would create a supersede cycle.
2385    /// - [`MemoryError::SupersedeDepthExceeded`] — chain depth cap would be exceeded.
2386    /// - [`MemoryError::Sqlx`] / [`MemoryError::Db`] — database errors.
2387    #[allow(clippy::too_many_arguments)]
2388    // TODO(B3): refactor into a builder or config struct to reduce argument count
2389    // function with many required inputs; a *Params struct would be more verbose without simplifying the call site
2390    #[tracing::instrument(name = "memory.graph.insert_or_supersede", skip_all)]
2391    pub async fn insert_or_supersede_with_metrics(
2392        &self,
2393        source_entity_id: i64,
2394        target_entity_id: i64,
2395        relation: &str,
2396        canonical_relation: &str,
2397        fact: &str,
2398        confidence: f32,
2399        episode_id: Option<MessageId>,
2400        edge_type: EdgeType,
2401        set_supersedes: bool,
2402        metrics: Option<&ApexMetrics>,
2403    ) -> Result<i64, MemoryError> {
2404        if source_entity_id == target_entity_id {
2405            return Err(MemoryError::InvalidInput(format!(
2406                "self-loop edge rejected: source and target are the same entity (id={source_entity_id})"
2407            )));
2408        }
2409        let confidence = confidence.clamp(0.0, 1.0);
2410        let edge_type_str = edge_type.as_str();
2411        let episode_raw: Option<i64> = episode_id.map(|m| m.0);
2412
2413        let mut tx = zeph_db::begin(&self.pool).await?;
2414
2415        if let Some(existing_id) = find_identical_active_edge(
2416            &mut tx,
2417            source_entity_id,
2418            target_entity_id,
2419            canonical_relation,
2420            edge_type_str,
2421            fact,
2422        )
2423        .await?
2424        {
2425            record_reassertion(&mut tx, existing_id, episode_raw, confidence).await?;
2426            tx.commit().await?;
2427            return Ok(existing_id);
2428        }
2429
2430        let prior_head =
2431            find_prior_active_head(&mut tx, source_entity_id, canonical_relation, edge_type_str)
2432                .await?;
2433
2434        // Cycle guard: depth cap also bounds cycles. Run inside the transaction using sqlx
2435        // directly to avoid a second pool acquire (SQLite pool is typically size 1 in tests).
2436        if let Some(head_id) = prior_head {
2437            check_supersede_depth_in_tx(&mut tx, head_id).await?;
2438        }
2439
2440        // Expire the prior head BEFORE inserting the new row so that the partial unique index
2441        // uq_graph_edges_active_head is not violated. SQLite enforces unique indexes at statement
2442        // level, not at transaction commit, so the deactivation must precede the INSERT.
2443        if let Some(head_id) = prior_head {
2444            expire_prior_head(&mut tx, head_id).await?;
2445        }
2446
2447        let supersedes_val: Option<i64> = if set_supersedes { prior_head } else { None };
2448        let new_id = insert_new_edge(
2449            &mut tx,
2450            source_entity_id,
2451            target_entity_id,
2452            relation,
2453            canonical_relation,
2454            fact,
2455            confidence,
2456            episode_raw,
2457            edge_type_str,
2458            supersedes_val,
2459        )
2460        .await?;
2461
2462        if let Some(head_id) = prior_head {
2463            set_superseded_by(&mut tx, head_id, new_id).await?;
2464            if let Some(m) = metrics {
2465                m.supersedes_total
2466                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2467            }
2468        }
2469
2470        tx.commit().await?;
2471        Ok(new_id)
2472    }
2473
2474    /// Convenience wrapper: calls [`Self::insert_or_supersede_with_metrics`] with `metrics = None`.
2475    ///
2476    /// # Errors
2477    ///
2478    /// Propagates errors from [`Self::insert_or_supersede_with_metrics`].
2479    #[allow(clippy::too_many_arguments)] // function with many required inputs; a *Params struct would be more verbose without simplifying the call site
2480    pub async fn insert_or_supersede(
2481        &self,
2482        source_entity_id: i64,
2483        target_entity_id: i64,
2484        relation: &str,
2485        canonical_relation: &str,
2486        fact: &str,
2487        confidence: f32,
2488        episode_id: Option<MessageId>,
2489        edge_type: EdgeType,
2490        set_supersedes: bool,
2491    ) -> Result<i64, MemoryError> {
2492        self.insert_or_supersede_with_metrics(
2493            source_entity_id,
2494            target_entity_id,
2495            relation,
2496            canonical_relation,
2497            fact,
2498            confidence,
2499            episode_id,
2500            edge_type,
2501            set_supersedes,
2502            None,
2503        )
2504        .await
2505    }
2506
2507    /// Walk the `supersedes` chain from `head_id` using a single recursive CTE and return its depth.
2508    ///
2509    /// Returns `0` when the edge has no `supersedes` pointer (it is the root).
2510    /// The CTE is capped at `SUPERSEDE_DEPTH_CAP + 1` to prevent unbounded recursion.
2511    ///
2512    /// # Errors
2513    ///
2514    /// Returns [`MemoryError::SupersedeCycle`] when the CTE detects a cycle (depth exceeds cap
2515    /// but the chain has not terminated), or a database error on query failure.
2516    pub async fn check_supersede_depth(&self, head_id: i64) -> Result<usize, MemoryError> {
2517        Self::check_supersede_depth_with_pool(&self.pool, head_id).await
2518    }
2519
2520    async fn check_supersede_depth_with_pool(
2521        pool: &zeph_db::DbPool,
2522        head_id: i64,
2523    ) -> Result<usize, MemoryError> {
2524        let cap = i64::try_from(SUPERSEDE_DEPTH_CAP + 1).unwrap_or(i64::MAX);
2525        // CTE walks the supersedes chain starting at head_id (depth=0).
2526        // Each hop to a superseded ancestor increments depth. NULL supersedes terminates the walk.
2527        let depth: Option<i64> = zeph_db::query_scalar(sql!(
2528            "WITH RECURSIVE chain(id, depth) AS (
2529               SELECT id, 0 FROM graph_edges WHERE id = ?
2530               UNION ALL
2531               SELECT e.supersedes, c.depth + 1
2532               FROM graph_edges e JOIN chain c ON e.id = c.id
2533               WHERE e.supersedes IS NOT NULL AND c.depth < ?
2534             )
2535             SELECT MAX(depth) FROM chain"
2536        ))
2537        .bind(head_id)
2538        .bind(cap)
2539        .fetch_optional(pool)
2540        .await?
2541        .flatten();
2542
2543        #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
2544        let d = depth.unwrap_or(0) as usize;
2545        if d > SUPERSEDE_DEPTH_CAP {
2546            return Err(MemoryError::SupersedeCycle(head_id));
2547        }
2548        Ok(d)
2549    }
2550
2551    // ── MemCoT provenance helpers (issue #3574 / #3575) ───────────────────────
2552
2553    /// Fetch `(edge_id, source_message_id)` pairs for a batch of edge ids.
2554    ///
2555    /// Returns only rows where `source_message_id` (`episode_id` column) is not NULL.
2556    /// Called from [`crate::semantic::SemanticMemory::recall_graph_view`] for Zoom-In provenance.
2557    ///
2558    /// # Errors
2559    ///
2560    /// Returns [`crate::error::MemoryError`] if the SQL query fails.
2561    pub async fn source_message_ids_for_edges(
2562        &self,
2563        edge_ids: &[i64],
2564    ) -> Result<Vec<(i64, crate::types::MessageId)>, crate::error::MemoryError> {
2565        if edge_ids.is_empty() {
2566            return Ok(Vec::new());
2567        }
2568        let placeholders = placeholder_list(1, edge_ids.len());
2569        let sql = format!(
2570            "SELECT id, episode_id FROM graph_edges \
2571             WHERE id IN ({placeholders}) AND episode_id IS NOT NULL"
2572        );
2573        let mut q = zeph_db::query_as::<_, (i64, crate::types::MessageId)>(&sql);
2574        for &eid in edge_ids {
2575            q = q.bind(eid);
2576        }
2577        let rows = q.fetch_all(&self.pool).await?;
2578        Ok(rows)
2579    }
2580
2581    /// Return the `source_entity_id` for a given edge id, or `None` if not found.
2582    ///
2583    /// Used by `recall_graph_view` with `ZoomOut` to seed the 1-hop BFS.
2584    ///
2585    /// # Errors
2586    ///
2587    /// Returns [`crate::error::MemoryError`] if the SQL query fails.
2588    pub async fn source_entity_id_for_edge(
2589        &self,
2590        edge_id: i64,
2591    ) -> Result<Option<i64>, crate::error::MemoryError> {
2592        let row: Option<i64> = zeph_db::query_scalar(sql!(
2593            "SELECT source_entity_id FROM graph_edges WHERE id = ?1 AND valid_to IS NULL LIMIT 1"
2594        ))
2595        .bind(edge_id)
2596        .fetch_optional(&self.pool)
2597        .await?;
2598        Ok(row)
2599    }
2600
2601    /// Fetch all active edges exactly 1 hop away from `entity_id`, filtered by `edge_types`.
2602    ///
2603    /// Returns a flat list of [`crate::recall_view::RecalledFact`] wrapping `GraphFact` values
2604    /// with `hop_distance = 1`. Used by `recall_graph_view` for `ZoomOut` neighbor expansion.
2605    ///
2606    /// # Errors
2607    ///
2608    /// Returns [`crate::error::MemoryError`] if the SQL query or entity name lookup fails.
2609    pub async fn bfs_edges_at_depth(
2610        &self,
2611        entity_id: i64,
2612        _depth: u32,
2613        edge_types: &[crate::graph::types::EdgeType],
2614    ) -> Result<Vec<crate::recall_view::RecalledFact>, crate::error::MemoryError> {
2615        let type_strs: Vec<&str> = if edge_types.is_empty() {
2616            vec!["semantic", "temporal", "causal", "entity"]
2617        } else {
2618            edge_types.iter().map(|et| et.as_str()).collect()
2619        };
2620
2621        let ph = placeholder_list(1, type_strs.len());
2622        let src_pos = type_strs.len() + 1;
2623        let tgt_pos = src_pos + 1;
2624        let src_ph = numbered_placeholder(src_pos);
2625        let tgt_ph = numbered_placeholder(tgt_pos);
2626
2627        let sql = format!(
2628            "SELECT ge.id, ge.source_entity_id, ge.target_entity_id, ge.relation, ge.fact,
2629                    ge.confidence, ge.valid_from, ge.valid_to, ge.created_at, ge.expired_at,
2630                    ge.episode_id, ge.qdrant_point_id, ge.edge_type, ge.retrieval_count,
2631                    ge.last_retrieved_at, ge.superseded_by, ge.canonical_relation, ge.supersedes, ge.weight
2632             FROM graph_edges ge
2633             WHERE ge.edge_type IN ({ph})
2634               AND ge.valid_to IS NULL
2635               AND (ge.source_entity_id = {src_ph} OR ge.target_entity_id = {tgt_ph})
2636             LIMIT 200"
2637        );
2638
2639        let mut q = zeph_db::query_as::<_, EdgeRow>(&sql);
2640        for t in &type_strs {
2641            q = q.bind(*t);
2642        }
2643        q = q.bind(entity_id).bind(entity_id);
2644
2645        let rows = q.fetch_all(&self.pool).await?;
2646
2647        // Resolve entity names from ids.
2648        let all_ids: Vec<i64> = rows
2649            .iter()
2650            .flat_map(|r| [r.source_entity_id, r.target_entity_id])
2651            .collect::<std::collections::HashSet<_>>()
2652            .into_iter()
2653            .collect();
2654
2655        let mut name_map: std::collections::HashMap<i64, String> = std::collections::HashMap::new();
2656        for chunk in all_ids.chunks(490) {
2657            let ph2 = placeholder_list(1, chunk.len());
2658            let name_sql =
2659                format!("SELECT id, canonical_name FROM graph_entities WHERE id IN ({ph2})");
2660            let mut nq = zeph_db::query_as::<_, (i64, String)>(&name_sql);
2661            for &id in chunk {
2662                nq = nq.bind(id);
2663            }
2664            let name_rows = nq.fetch_all(&self.pool).await?;
2665            for (id, name) in name_rows {
2666                name_map.insert(id, name);
2667            }
2668        }
2669
2670        let facts: Vec<crate::recall_view::RecalledFact> = rows
2671            .into_iter()
2672            .filter_map(|row| {
2673                let edge = edge_from_row(row);
2674                let entity_name = name_map.get(&edge.source_entity_id).cloned()?;
2675                let target_name = name_map.get(&edge.target_entity_id).cloned()?;
2676                if entity_name.is_empty() || target_name.is_empty() {
2677                    return None;
2678                }
2679                let fact = crate::graph::types::GraphFact {
2680                    entity_name,
2681                    relation: edge.canonical_relation.clone(),
2682                    target_name,
2683                    fact: edge.fact.clone(),
2684                    entity_match_score: 0.5,
2685                    hop_distance: 1,
2686                    confidence: edge.confidence,
2687                    valid_from: if edge.valid_from.is_empty() {
2688                        None
2689                    } else {
2690                        Some(edge.valid_from.clone())
2691                    },
2692                    edge_type: edge.edge_type,
2693                    retrieval_count: edge.retrieval_count,
2694                    edge_id: Some(edge.id),
2695                };
2696                Some(crate::recall_view::RecalledFact::from_graph_fact(fact))
2697            })
2698            .collect();
2699
2700        Ok(facts)
2701    }
2702}
2703
2704// ── insert_or_supersede helpers ───────────────────────────────────────────────
2705// All helpers take `&mut zeph_db::DbTransaction` so they run inside the caller's transaction
2706// without acquiring a second connection (SQLite pool is typically size 1 in tests).
2707
2708type Tx<'a> = zeph_db::DbTransaction<'a>;
2709
2710/// Look up a byte-identical active edge (FR-015 reassertion path).
2711///
2712/// Returns the existing edge ID when all columns match exactly, or `None` otherwise.
2713async fn find_identical_active_edge(
2714    tx: &mut Tx<'_>,
2715    src: i64,
2716    tgt: i64,
2717    canon: &str,
2718    edge_type_str: &str,
2719    fact: &str,
2720) -> Result<Option<i64>, MemoryError> {
2721    Ok(zeph_db::query_scalar(sql!(
2722        "SELECT id FROM graph_edges
2723         WHERE source_entity_id = ?
2724           AND target_entity_id = ?
2725           AND canonical_relation = ?
2726           AND edge_type = ?
2727           AND fact = ?
2728           AND valid_to IS NULL
2729           AND expired_at IS NULL
2730         LIMIT 1"
2731    ))
2732    .bind(src)
2733    .bind(tgt)
2734    .bind(canon)
2735    .bind(edge_type_str)
2736    .bind(fact)
2737    .fetch_optional(&mut **tx)
2738    .await?)
2739}
2740
2741/// Record a reassertion event for an existing edge (FR-015).
2742async fn record_reassertion(
2743    tx: &mut Tx<'_>,
2744    head_id: i64,
2745    episode_raw: Option<i64>,
2746    confidence: f32,
2747) -> Result<(), MemoryError> {
2748    #[allow(clippy::cast_possible_wrap)]
2749    let asserted_at = std::time::SystemTime::now()
2750        .duration_since(std::time::UNIX_EPOCH)
2751        .unwrap_or_default()
2752        .as_secs() as i64;
2753    zeph_db::query(sql!(
2754        "INSERT INTO edge_reassertions (head_edge_id, asserted_at, episode_id, confidence)
2755         VALUES (?, ?, ?, ?)"
2756    ))
2757    .bind(head_id)
2758    .bind(asserted_at)
2759    .bind(episode_raw)
2760    .bind(f64::from(confidence))
2761    .execute(&mut **tx)
2762    .await?;
2763    Ok(())
2764}
2765
2766/// Find the current active head edge for `(src, canonical_relation, edge_type)`.
2767async fn find_prior_active_head(
2768    tx: &mut Tx<'_>,
2769    src: i64,
2770    canon: &str,
2771    edge_type_str: &str,
2772) -> Result<Option<i64>, MemoryError> {
2773    Ok(zeph_db::query_scalar(sql!(
2774        "SELECT id FROM graph_edges
2775         WHERE source_entity_id = ?
2776           AND canonical_relation = ?
2777           AND edge_type = ?
2778           AND valid_to IS NULL
2779           AND expired_at IS NULL
2780         ORDER BY created_at DESC
2781         LIMIT 1"
2782    ))
2783    .bind(src)
2784    .bind(canon)
2785    .bind(edge_type_str)
2786    .fetch_optional(&mut **tx)
2787    .await?)
2788}
2789
2790/// Verify the supersede chain depth for `head_id` does not exceed [`SUPERSEDE_DEPTH_CAP`].
2791///
2792/// Returns `Err(MemoryError::SupersedeDepthExceeded)` when the cap would be exceeded.
2793async fn check_supersede_depth_in_tx(tx: &mut Tx<'_>, head_id: i64) -> Result<(), MemoryError> {
2794    let cap = i64::try_from(SUPERSEDE_DEPTH_CAP + 1).unwrap_or(i64::MAX);
2795    let depth: Option<i64> = sqlx::query_scalar(
2796        "WITH RECURSIVE chain(id, depth) AS (
2797           SELECT supersedes, 1 FROM graph_edges WHERE id = ? AND supersedes IS NOT NULL
2798           UNION ALL
2799           SELECT e.supersedes, c.depth + 1
2800           FROM graph_edges e JOIN chain c ON e.id = c.id
2801           WHERE e.supersedes IS NOT NULL AND c.depth < ?
2802         )
2803         SELECT MAX(depth) FROM chain",
2804    )
2805    .bind(head_id)
2806    .bind(cap)
2807    .fetch_optional(&mut **tx)
2808    .await?
2809    .flatten();
2810    let d = usize::try_from(depth.unwrap_or(0)).unwrap_or(usize::MAX);
2811    if d > SUPERSEDE_DEPTH_CAP {
2812        return Err(MemoryError::SupersedeDepthExceeded(head_id));
2813    }
2814    Ok(())
2815}
2816
2817/// Insert a new edge row and return its generated ID.
2818#[allow(clippy::too_many_arguments)]
2819async fn insert_new_edge(
2820    tx: &mut Tx<'_>,
2821    src: i64,
2822    tgt: i64,
2823    relation: &str,
2824    canonical_relation: &str,
2825    fact: &str,
2826    confidence: f32,
2827    episode_raw: Option<i64>,
2828    edge_type_str: &str,
2829    supersedes_val: Option<i64>,
2830) -> Result<i64, MemoryError> {
2831    Ok(zeph_db::query_scalar(sql!(
2832        "INSERT INTO graph_edges
2833         (source_entity_id, target_entity_id, relation, canonical_relation, fact,
2834          confidence, episode_id, edge_type, supersedes)
2835         VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
2836         RETURNING id"
2837    ))
2838    .bind(src)
2839    .bind(tgt)
2840    .bind(relation)
2841    .bind(canonical_relation)
2842    .bind(fact)
2843    .bind(f64::from(confidence))
2844    .bind(episode_raw)
2845    .bind(edge_type_str)
2846    .bind(supersedes_val)
2847    .fetch_one(&mut **tx)
2848    .await?)
2849}
2850
2851/// Deactivate the prior head by setting `valid_to` and `expired_at`, leaving `superseded_by`
2852/// unset. Must be called **before** inserting the new row to avoid violating
2853/// `uq_graph_edges_active_head` — the unique index is enforced at statement level, not at commit.
2854async fn expire_prior_head(tx: &mut Tx<'_>, head_id: i64) -> Result<(), MemoryError> {
2855    zeph_db::query(sql!(
2856        "UPDATE graph_edges
2857         SET valid_to = CURRENT_TIMESTAMP,
2858             expired_at = CURRENT_TIMESTAMP
2859         WHERE id = ?"
2860    ))
2861    .bind(head_id)
2862    .execute(&mut **tx)
2863    .await?;
2864    Ok(())
2865}
2866
2867/// Back-fill `superseded_by` on the already-expired prior head after the new row has been
2868/// inserted and its ID is known.
2869async fn set_superseded_by(tx: &mut Tx<'_>, head_id: i64, new_id: i64) -> Result<(), MemoryError> {
2870    zeph_db::query(sql!(
2871        "UPDATE graph_edges SET superseded_by = ? WHERE id = ?"
2872    ))
2873    .bind(new_id)
2874    .bind(head_id)
2875    .execute(&mut **tx)
2876    .await?;
2877    Ok(())
2878}
2879
2880// ── FTS helpers ───────────────────────────────────────────────────────────────
2881
2882/// Row type for the UNION ALL FTS5 query in `find_entities_ranked`.
2883type EntityFtsRow = (
2884    i64,
2885    String,
2886    String,
2887    String,
2888    Option<String>,
2889    String,
2890    String,
2891    Option<String>,
2892    f64,
2893);
2894
2895/// Sanitize and tokenize `query` into an FTS5 query string.
2896///
2897/// Returns `None` when the input is empty or contains only FTS5 operator tokens,
2898/// indicating no search should be attempted.
2899fn build_fts_query(query: &str) -> Option<String> {
2900    const FTS5_OPERATORS: &[&str] = &["AND", "OR", "NOT", "NEAR"];
2901    let sanitized = sanitize_fts_query(query);
2902    if sanitized.is_empty() {
2903        return None;
2904    }
2905    let fts_query: String = sanitized
2906        .split_whitespace()
2907        .filter(|t| !FTS5_OPERATORS.contains(t))
2908        .map(|t| format!("{t}*"))
2909        .collect::<Vec<_>>()
2910        .join(" ");
2911    if fts_query.is_empty() {
2912        None
2913    } else {
2914        Some(fts_query)
2915    }
2916}
2917
2918/// Build the UNION ALL FTS5 SQL for entity ranked search.
2919///
2920/// The SQL selects entities by direct FTS5 match (negated BM25) and by alias LIKE match
2921/// (fixed score 0.5), then orders by score descending with a LIMIT applied outside.
2922fn build_ranked_fts_sql() -> String {
2923    format!(
2924        "SELECT * FROM ( \
2925             SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
2926                    e.first_seen_at, e.last_seen_at, e.qdrant_point_id, \
2927                    -bm25(graph_entities_fts, 10.0, 1.0) AS fts_rank \
2928             FROM graph_entities_fts fts \
2929             JOIN graph_entities e ON e.id = fts.rowid \
2930             WHERE graph_entities_fts MATCH ? \
2931             UNION ALL \
2932             SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
2933                    e.first_seen_at, e.last_seen_at, e.qdrant_point_id, \
2934                    0.5 AS fts_rank \
2935             FROM graph_entity_aliases a \
2936             JOIN graph_entities e ON e.id = a.entity_id \
2937             WHERE a.alias_name LIKE ? ESCAPE '\\' {} \
2938         ) \
2939         ORDER BY fts_rank DESC \
2940         LIMIT ?",
2941        <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
2942    )
2943}
2944
2945/// Normalize FTS scores to `[0, 1]` and deduplicate by entity ID.
2946///
2947/// Deduplication keeps the first (highest-ranked) occurrence of each entity ID.
2948fn normalize_and_dedup(rows: Vec<EntityFtsRow>) -> Vec<(Entity, f32)> {
2949    // Guard against div-by-zero when all scores are 0.
2950    let max_score: f64 = rows.iter().map(|r| r.8).fold(0.0_f64, f64::max);
2951    let max_score = if max_score <= 0.0 { 1.0 } else { max_score };
2952
2953    let mut seen_ids: std::collections::HashSet<i64> = std::collections::HashSet::new();
2954    let mut result: Vec<(Entity, f32)> = Vec::with_capacity(rows.len());
2955    for (
2956        id,
2957        name,
2958        canonical_name,
2959        entity_type_str,
2960        summary,
2961        first_seen_at,
2962        last_seen_at,
2963        qdrant_point_id,
2964        raw_score,
2965    ) in rows
2966    {
2967        if !seen_ids.insert(id) {
2968            continue;
2969        }
2970        let entity_type = entity_type_str
2971            .parse()
2972            .unwrap_or(super::types::EntityType::Concept);
2973        let entity = Entity {
2974            id: EntityId(id),
2975            name,
2976            canonical_name,
2977            entity_type,
2978            summary,
2979            first_seen_at,
2980            last_seen_at,
2981            qdrant_point_id,
2982        };
2983        #[allow(clippy::cast_possible_truncation)]
2984        let normalized = (raw_score / max_score).clamp(0.0, 1.0) as f32;
2985        result.push((entity, normalized));
2986    }
2987    result
2988}
2989
2990// ── Tests ─────────────────────────────────────────────────────────────────────
2991
2992#[cfg(test)]
2993mod tests;