Skip to main content

zeph_memory/graph/store/
mod.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! SQLite-backed knowledge graph store.
5//!
6//! [`GraphStore`] manages the `graph_entities`, `graph_edges`, `graph_entity_aliases`,
7//! `graph_communities`, and `graph_episodes` tables. It is the persistence layer for
8//! the MAGMA / GAAMA graph subsystem.
9
10use std::collections::HashMap;
11#[allow(unused_imports)]
12use zeph_db::sql;
13
14use futures::Stream;
15use zeph_db::fts::sanitize_fts_query;
16use zeph_db::{ActiveDialect, DbPool, numbered_placeholder, placeholder_list};
17
18use crate::error::MemoryError;
19use crate::graph::conflict::{ApexMetrics, SUPERSEDE_DEPTH_CAP};
20use crate::types::MessageId;
21
22use super::types::{Community, Edge, EdgeType, Entity, EntityAlias, EntityType};
23
24/// SQLite-backed persistence layer for the knowledge graph.
25///
26/// All graph mutations go through this type: entity upserts, edge creation,
27/// alias recording, community assignment, and episode management.
28///
29/// Obtain an instance via [`GraphStore::new`] after running the `zeph-db` migrations.
30pub struct GraphStore {
31    pool: DbPool,
32}
33
34impl GraphStore {
35    /// Create a new `GraphStore` wrapping `pool`.
36    ///
37    /// The pool must come from a database with the `zeph-db` migrations applied.
38    #[must_use]
39    pub fn new(pool: DbPool) -> Self {
40        Self { pool }
41    }
42
43    /// Access the underlying database pool.
44    #[must_use]
45    pub fn pool(&self) -> &DbPool {
46        &self.pool
47    }
48
49    // ── Entities ─────────────────────────────────────────────────────────────
50
51    /// Insert or update an entity by `(canonical_name, entity_type)`.
52    ///
53    /// - `surface_name`: the original display form (e.g. `"Rust"`) — stored in the `name` column
54    ///   so user-facing output preserves casing. Updated on every upsert to the latest seen form.
55    /// - `canonical_name`: the stable normalized key (e.g. `"rust"`) — used for deduplication.
56    /// - `summary`: pass `None` to preserve the existing summary; pass `Some("")` to blank it.
57    ///
58    /// # Errors
59    ///
60    /// Returns an error if the database query fails.
61    pub async fn upsert_entity(
62        &self,
63        surface_name: &str,
64        canonical_name: &str,
65        entity_type: EntityType,
66        summary: Option<&str>,
67    ) -> Result<i64, MemoryError> {
68        let type_str = entity_type.as_str();
69        let id: i64 = zeph_db::query_scalar(sql!(
70            "INSERT INTO graph_entities (name, canonical_name, entity_type, summary)
71             VALUES (?, ?, ?, ?)
72             ON CONFLICT(canonical_name, entity_type) DO UPDATE SET
73               name = excluded.name,
74               summary = COALESCE(excluded.summary, summary),
75               last_seen_at = CURRENT_TIMESTAMP
76             RETURNING id"
77        ))
78        .bind(surface_name)
79        .bind(canonical_name)
80        .bind(type_str)
81        .bind(summary)
82        .fetch_one(&self.pool)
83        .await?;
84        Ok(id)
85    }
86
87    /// Find an entity by exact canonical name and type.
88    ///
89    /// # Errors
90    ///
91    /// Returns an error if the database query fails.
92    pub async fn find_entity(
93        &self,
94        canonical_name: &str,
95        entity_type: EntityType,
96    ) -> Result<Option<Entity>, MemoryError> {
97        let type_str = entity_type.as_str();
98        let row: Option<EntityRow> = zeph_db::query_as(
99            sql!("SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
100             FROM graph_entities
101             WHERE canonical_name = ? AND entity_type = ?"),
102        )
103        .bind(canonical_name)
104        .bind(type_str)
105        .fetch_optional(&self.pool)
106        .await?;
107        row.map(entity_from_row).transpose()
108    }
109
110    /// Find an entity by its numeric ID.
111    ///
112    /// # Errors
113    ///
114    /// Returns an error if the database query fails.
115    pub async fn find_entity_by_id(&self, entity_id: i64) -> Result<Option<Entity>, MemoryError> {
116        let row: Option<EntityRow> = zeph_db::query_as(
117            sql!("SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
118             FROM graph_entities
119             WHERE id = ?"),
120        )
121        .bind(entity_id)
122        .fetch_optional(&self.pool)
123        .await?;
124        row.map(entity_from_row).transpose()
125    }
126
127    /// Update the `qdrant_point_id` for an entity.
128    ///
129    /// # Errors
130    ///
131    /// Returns an error if the database query fails.
132    pub async fn set_entity_qdrant_point_id(
133        &self,
134        entity_id: i64,
135        point_id: &str,
136    ) -> Result<(), MemoryError> {
137        zeph_db::query(sql!(
138            "UPDATE graph_entities SET qdrant_point_id = ? WHERE id = ?"
139        ))
140        .bind(point_id)
141        .bind(entity_id)
142        .execute(&self.pool)
143        .await?;
144        Ok(())
145    }
146
147    /// Find entities matching `query` in name, summary, or aliases, up to `limit` results, ranked by relevance.
148    ///
149    /// Uses FTS5 MATCH with prefix wildcards (`token*`) and bm25 ranking. Name matches are
150    /// weighted 10x higher than summary matches. Also searches `graph_entity_aliases` for
151    /// alias matches via a UNION query.
152    ///
153    /// # Behavioral note
154    ///
155    /// This replaces the previous `LIKE '%query%'` implementation. FTS5 prefix matching differs
156    /// from substring matching: searching "SQL" will match "`SQLite`" (prefix) but NOT "`GraphQL`"
157    /// (substring). Entity names are indexed as single tokens by the unicode61 tokenizer, so
158    /// mid-word substrings are not matched. This is a known trade-off for index performance.
159    ///
160    /// Single-character queries (e.g., "a") are allowed and produce a broad prefix match ("a*").
161    /// The `limit` parameter caps the result set. No minimum query length is enforced; if this
162    /// causes noise in practice, add a minimum length guard at the call site.
163    ///
164    /// # Errors
165    ///
166    /// Returns an error if the database query fails.
167    pub async fn find_entities_fuzzy(
168        &self,
169        query: &str,
170        limit: usize,
171    ) -> Result<Vec<Entity>, MemoryError> {
172        // FTS5 boolean operator keywords (case-sensitive uppercase). Filtering these
173        // prevents syntax errors when user input contains them as literal search terms
174        // (e.g., "graph OR unrelated" must not produce "graph* OR* unrelated*").
175        const FTS5_OPERATORS: &[&str] = &["AND", "OR", "NOT", "NEAR"];
176        let query = &query[..query.floor_char_boundary(512)];
177        // Sanitize input: split on non-alphanumeric characters, filter empty tokens,
178        // append '*' to each token for FTS5 prefix matching ("graph" -> "graph*").
179        let sanitized = sanitize_fts_query(query);
180        if sanitized.is_empty() {
181            return Ok(vec![]);
182        }
183        let fts_query: String = sanitized
184            .split_whitespace()
185            .filter(|t| !FTS5_OPERATORS.contains(t))
186            .map(|t| format!("{t}*"))
187            .collect::<Vec<_>>()
188            .join(" ");
189        if fts_query.is_empty() {
190            return Ok(vec![]);
191        }
192
193        let limit = i64::try_from(limit)?;
194        // bm25(graph_entities_fts, 10.0, 1.0): name column weighted 10x over summary.
195        // bm25() returns negative values; ORDER BY ASC puts best matches first.
196        let search_sql = format!(
197            "SELECT DISTINCT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
198                    e.first_seen_at, e.last_seen_at, e.qdrant_point_id \
199             FROM graph_entities_fts fts \
200             JOIN graph_entities e ON e.id = fts.rowid \
201             WHERE graph_entities_fts MATCH ? \
202             UNION \
203             SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
204                    e.first_seen_at, e.last_seen_at, e.qdrant_point_id \
205             FROM graph_entity_aliases a \
206             JOIN graph_entities e ON e.id = a.entity_id \
207             WHERE a.alias_name LIKE ? ESCAPE '\\' {} \
208             LIMIT ?",
209            <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
210        );
211        let rows: Vec<EntityRow> = zeph_db::query_as(&search_sql)
212            .bind(&fts_query)
213            .bind(format!(
214                "%{}%",
215                query
216                    .trim()
217                    .replace('\\', "\\\\")
218                    .replace('%', "\\%")
219                    .replace('_', "\\_")
220            ))
221            .bind(limit)
222            .fetch_all(&self.pool)
223            .await?;
224        rows.into_iter()
225            .map(entity_from_row)
226            .collect::<Result<Vec<_>, _>>()
227    }
228
229    /// Flush the `SQLite` WAL to the main database file.
230    ///
231    /// Runs `PRAGMA wal_checkpoint(PASSIVE)`. Safe to call at any time; does not block active
232    /// readers or writers. Call after bulk entity inserts to ensure FTS5 shadow table writes are
233    /// visible to connections opened in future sessions.
234    ///
235    /// # Errors
236    ///
237    /// Returns an error if the PRAGMA execution fails.
238    #[cfg(feature = "sqlite")]
239    pub async fn checkpoint_wal(&self) -> Result<(), MemoryError> {
240        zeph_db::query("PRAGMA wal_checkpoint(PASSIVE)")
241            .execute(&self.pool)
242            .await?;
243        Ok(())
244    }
245
246    /// No-op on `PostgreSQL` (WAL management is handled by the server).
247    ///
248    /// # Errors
249    ///
250    /// Never returns an error.
251    #[cfg(feature = "postgres")]
252    pub async fn checkpoint_wal(&self) -> Result<(), MemoryError> {
253        Ok(())
254    }
255
256    /// Stream all entities from the database incrementally (true cursor, no full-table load).
257    pub fn all_entities_stream(&self) -> impl Stream<Item = Result<Entity, MemoryError>> + '_ {
258        use futures::StreamExt as _;
259        zeph_db::query_as::<_, EntityRow>(
260            sql!("SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
261             FROM graph_entities ORDER BY id ASC"),
262        )
263        .fetch(&self.pool)
264        .map(|r: Result<EntityRow, zeph_db::SqlxError>| {
265            r.map_err(MemoryError::from).and_then(entity_from_row)
266        })
267    }
268
269    // ── Alias methods ─────────────────────────────────────────────────────────
270
271    /// Insert an alias for an entity (idempotent: duplicate alias is silently ignored via UNIQUE constraint).
272    ///
273    /// # Errors
274    ///
275    /// Returns an error if the database query fails.
276    pub async fn add_alias(&self, entity_id: i64, alias_name: &str) -> Result<(), MemoryError> {
277        let insert_alias_sql = format!(
278            "{} INTO graph_entity_aliases (entity_id, alias_name) VALUES (?, ?){}",
279            <ActiveDialect as zeph_db::dialect::Dialect>::INSERT_IGNORE,
280            <ActiveDialect as zeph_db::dialect::Dialect>::CONFLICT_NOTHING,
281        );
282        zeph_db::query(&insert_alias_sql)
283            .bind(entity_id)
284            .bind(alias_name)
285            .execute(&self.pool)
286            .await?;
287        Ok(())
288    }
289
290    /// Find an entity by alias name and entity type (case-insensitive).
291    ///
292    /// Filters by `entity_type` to avoid cross-type alias collisions (S2 fix).
293    ///
294    /// # Errors
295    ///
296    /// Returns an error if the database query fails.
297    pub async fn find_entity_by_alias(
298        &self,
299        alias_name: &str,
300        entity_type: EntityType,
301    ) -> Result<Option<Entity>, MemoryError> {
302        let type_str = entity_type.as_str();
303        let alias_typed_sql = format!(
304            "SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
305                    e.first_seen_at, e.last_seen_at, e.qdrant_point_id \
306             FROM graph_entity_aliases a \
307             JOIN graph_entities e ON e.id = a.entity_id \
308             WHERE a.alias_name = ? {} \
309               AND e.entity_type = ? \
310             ORDER BY e.id ASC \
311             LIMIT 1",
312            <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
313        );
314        let row: Option<EntityRow> = zeph_db::query_as(&alias_typed_sql)
315            .bind(alias_name)
316            .bind(type_str)
317            .fetch_optional(&self.pool)
318            .await?;
319        row.map(entity_from_row).transpose()
320    }
321
322    /// Get all aliases for an entity.
323    ///
324    /// # Errors
325    ///
326    /// Returns an error if the database query fails.
327    pub async fn aliases_for_entity(
328        &self,
329        entity_id: i64,
330    ) -> Result<Vec<EntityAlias>, MemoryError> {
331        let rows: Vec<AliasRow> = zeph_db::query_as(sql!(
332            "SELECT id, entity_id, alias_name, created_at
333             FROM graph_entity_aliases
334             WHERE entity_id = ?
335             ORDER BY id ASC"
336        ))
337        .bind(entity_id)
338        .fetch_all(&self.pool)
339        .await?;
340        Ok(rows.into_iter().map(alias_from_row).collect())
341    }
342
343    /// Collect all entities into a Vec.
344    ///
345    /// # Errors
346    ///
347    /// Returns an error if the database query fails or `entity_type` parsing fails.
348    pub async fn all_entities(&self) -> Result<Vec<Entity>, MemoryError> {
349        use futures::TryStreamExt as _;
350        self.all_entities_stream().try_collect().await
351    }
352
353    /// Count the total number of entities.
354    ///
355    /// # Errors
356    ///
357    /// Returns an error if the database query fails.
358    pub async fn entity_count(&self) -> Result<i64, MemoryError> {
359        let count: i64 = zeph_db::query_scalar(sql!("SELECT COUNT(*) FROM graph_entities"))
360            .fetch_one(&self.pool)
361            .await?;
362        Ok(count)
363    }
364
365    // ── Edges ─────────────────────────────────────────────────────────────────
366
367    /// Insert a new edge between two entities, or update the existing active edge.
368    ///
369    /// An active edge is identified by `(source_entity_id, target_entity_id, relation, edge_type)`
370    /// with `valid_to IS NULL`. If such an edge already exists, its `confidence` is updated to the
371    /// maximum of the stored and incoming values, and the existing id is returned. This prevents
372    /// duplicate edges from repeated extraction of the same context messages.
373    ///
374    /// The dedup key includes `edge_type` (critic mitigation): the same `(source, target, relation)`
375    /// triple can legitimately exist with different edge types (e.g., `depends_on` can be both
376    /// Semantic and Causal). Without `edge_type` in the key, the second insertion would silently
377    /// update the first and lose the type classification.
378    ///
379    /// # Errors
380    ///
381    /// Returns an error if the database query fails.
382    pub async fn insert_edge(
383        &self,
384        source_entity_id: i64,
385        target_entity_id: i64,
386        relation: &str,
387        fact: &str,
388        confidence: f32,
389        episode_id: Option<MessageId>,
390    ) -> Result<i64, MemoryError> {
391        self.insert_edge_typed(
392            source_entity_id,
393            target_entity_id,
394            relation,
395            fact,
396            confidence,
397            episode_id,
398            EdgeType::Semantic,
399        )
400        .await
401    }
402
403    /// Insert a typed edge between two entities, or update the existing active edge of the same type.
404    ///
405    /// Identical semantics to [`Self::insert_edge`] but with an explicit `edge_type` parameter.
406    /// The dedup key is `(source_entity_id, target_entity_id, relation, edge_type, valid_to IS NULL)`.
407    ///
408    /// # Errors
409    ///
410    /// Returns an error if the database query fails.
411    #[allow(clippy::too_many_arguments)] // function with many required inputs; a *Params struct would be more verbose without simplifying the call site
412    pub async fn insert_edge_typed(
413        &self,
414        source_entity_id: i64,
415        target_entity_id: i64,
416        relation: &str,
417        fact: &str,
418        confidence: f32,
419        episode_id: Option<MessageId>,
420        edge_type: EdgeType,
421    ) -> Result<i64, MemoryError> {
422        if source_entity_id == target_entity_id {
423            return Err(MemoryError::InvalidInput(format!(
424                "self-loop edge rejected: source and target are the same entity (id={source_entity_id})"
425            )));
426        }
427        let confidence = confidence.clamp(0.0, 1.0);
428        let edge_type_str = edge_type.as_str();
429
430        // Wrap SELECT + INSERT/UPDATE in a single transaction to eliminate the race window
431        // between existence check and write. The unique partial index uq_graph_edges_active
432        // covers (source, target, relation, edge_type) WHERE valid_to IS NULL; SQLite does not
433        // support ON CONFLICT DO UPDATE against partial indexes, so we keep two statements.
434        let mut tx = zeph_db::begin(&self.pool).await?;
435
436        let existing: Option<(i64, f64)> = zeph_db::query_as(sql!(
437            "SELECT id, confidence FROM graph_edges
438             WHERE source_entity_id = ?
439               AND target_entity_id = ?
440               AND relation = ?
441               AND edge_type = ?
442               AND valid_to IS NULL
443             LIMIT 1"
444        ))
445        .bind(source_entity_id)
446        .bind(target_entity_id)
447        .bind(relation)
448        .bind(edge_type_str)
449        .fetch_optional(&mut *tx)
450        .await?;
451
452        if let Some((existing_id, stored_conf)) = existing {
453            let updated_conf = f64::from(confidence).max(stored_conf);
454            zeph_db::query(sql!("UPDATE graph_edges SET confidence = ? WHERE id = ?"))
455                .bind(updated_conf)
456                .bind(existing_id)
457                .execute(&mut *tx)
458                .await?;
459            tx.commit().await?;
460            return Ok(existing_id);
461        }
462
463        let episode_raw: Option<i64> = episode_id.map(|m| m.0);
464        let id: i64 = zeph_db::query_scalar(sql!(
465            "INSERT INTO graph_edges
466             (source_entity_id, target_entity_id, relation, fact, confidence, episode_id, edge_type)
467             VALUES (?, ?, ?, ?, ?, ?, ?)
468             RETURNING id"
469        ))
470        .bind(source_entity_id)
471        .bind(target_entity_id)
472        .bind(relation)
473        .bind(fact)
474        .bind(f64::from(confidence))
475        .bind(episode_raw)
476        .bind(edge_type_str)
477        .fetch_one(&mut *tx)
478        .await?;
479        tx.commit().await?;
480        Ok(id)
481    }
482
483    /// Mark an edge as invalid (set `valid_to` and `expired_at` to now).
484    ///
485    /// # Errors
486    ///
487    /// Returns an error if the database update fails.
488    pub async fn invalidate_edge(&self, edge_id: i64) -> Result<(), MemoryError> {
489        zeph_db::query(sql!(
490            "UPDATE graph_edges SET valid_to = CURRENT_TIMESTAMP, expired_at = CURRENT_TIMESTAMP
491             WHERE id = ?"
492        ))
493        .bind(edge_id)
494        .execute(&self.pool)
495        .await?;
496        Ok(())
497    }
498
499    /// Invalidate an edge and record the supersession pointer for Kumiho belief revision audit trail.
500    ///
501    /// Sets `valid_to`, `expired_at`, and `superseded_by` on the old edge to link it to its replacement.
502    ///
503    /// # Errors
504    ///
505    /// Returns an error if the database update fails.
506    pub async fn invalidate_edge_with_supersession(
507        &self,
508        old_edge_id: i64,
509        new_edge_id: i64,
510    ) -> Result<(), MemoryError> {
511        zeph_db::query(sql!(
512            "UPDATE graph_edges
513             SET valid_to = CURRENT_TIMESTAMP,
514                 expired_at = CURRENT_TIMESTAMP,
515                 superseded_by = ?
516             WHERE id = ?"
517        ))
518        .bind(new_edge_id)
519        .bind(old_edge_id)
520        .execute(&self.pool)
521        .await?;
522        Ok(())
523    }
524
525    /// Get all active edges for a batch of entity IDs, with optional MAGMA edge type filtering.
526    ///
527    /// Fetches all currently-active edges (`valid_to IS NULL`) where either endpoint
528    /// is in `entity_ids`. Traversal is always current-time only (no `at_timestamp` support
529    /// in v1 — see `bfs_at_timestamp` for historical traversal).
530    ///
531    /// # `SQLite` bind limit safety
532    ///
533    /// `SQLite` limits the number of bind parameters to `SQLITE_MAX_VARIABLE_NUMBER` (999 by
534    /// default). Each entity ID requires two bind slots (source OR target), so batches are
535    /// chunked at `MAX_BATCH_ENTITIES = 490` to stay safely under the limit regardless of
536    /// compile-time `SQLite` configuration.
537    ///
538    /// # Errors
539    ///
540    /// Returns an error if the database query fails.
541    pub async fn edges_for_entities(
542        &self,
543        entity_ids: &[i64],
544        edge_types: &[super::types::EdgeType],
545    ) -> Result<Vec<Edge>, MemoryError> {
546        // Safe margin under SQLite SQLITE_MAX_VARIABLE_NUMBER (999):
547        // each entity ID uses 2 bind slots (source_entity_id OR target_entity_id).
548        // 490 * 2 = 980, leaving headroom for future query additions.
549        const MAX_BATCH_ENTITIES: usize = 490;
550
551        let mut all_edges: Vec<Edge> = Vec::new();
552
553        for chunk in entity_ids.chunks(MAX_BATCH_ENTITIES) {
554            let edges = self.query_batch_edges(chunk, edge_types).await?;
555            all_edges.extend(edges);
556        }
557
558        Ok(all_edges)
559    }
560
561    /// Query active edges for a single chunk of entity IDs (internal helper).
562    ///
563    /// Caller is responsible for ensuring `entity_ids.len() <= MAX_BATCH_ENTITIES`.
564    ///
565    /// # Errors
566    ///
567    /// Returns an error if any database query fails.
568    async fn query_batch_edges(
569        &self,
570        entity_ids: &[i64],
571        edge_types: &[super::types::EdgeType],
572    ) -> Result<Vec<Edge>, MemoryError> {
573        if entity_ids.is_empty() {
574            return Ok(Vec::new());
575        }
576
577        // Build a parameterized IN clause with backend-appropriate placeholders.
578        // We cannot use the sql! macro here because the placeholder count is dynamic.
579        let n_ids = entity_ids.len();
580        let n_types = edge_types.len();
581
582        let sql = if n_types == 0 {
583            // placeholders used twice (source IN and target IN)
584            let placeholders = placeholder_list(1, n_ids);
585            let placeholders2 = placeholder_list(n_ids + 1, n_ids);
586            format!(
587                "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
588                        valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
589                        edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
590                 FROM graph_edges
591                 WHERE valid_to IS NULL
592                   AND (source_entity_id IN ({placeholders}) OR target_entity_id IN ({placeholders2}))"
593            )
594        } else {
595            let placeholders = placeholder_list(1, n_ids);
596            let placeholders2 = placeholder_list(n_ids + 1, n_ids);
597            let type_placeholders = placeholder_list(n_ids * 2 + 1, n_types);
598            format!(
599                "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
600                        valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
601                        edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
602                 FROM graph_edges
603                 WHERE valid_to IS NULL
604                   AND (source_entity_id IN ({placeholders}) OR target_entity_id IN ({placeholders2}))
605                   AND edge_type IN ({type_placeholders})"
606            )
607        };
608
609        // Bind entity IDs twice (source IN and target IN clauses) then edge types.
610        let mut query = zeph_db::query_as::<_, EdgeRow>(&sql);
611        for id in entity_ids {
612            query = query.bind(*id);
613        }
614        for id in entity_ids {
615            query = query.bind(*id);
616        }
617        for et in edge_types {
618            query = query.bind(et.as_str());
619        }
620
621        // Wrap pool.acquire() + query execution in a short timeout to prevent the outer
622        // tokio::time::timeout (in SemanticMemory recall) from cancelling a mid-acquire
623        // future, which causes sqlx 0.8 semaphore count drift and permanent pool starvation.
624        let rows: Vec<EdgeRow> = tokio::time::timeout(
625            std::time::Duration::from_millis(500),
626            query.fetch_all(&self.pool),
627        )
628        .await
629        .map_err(|_| MemoryError::Timeout("graph pool acquire timed out after 500ms".into()))??;
630        Ok(rows.into_iter().map(edge_from_row).collect())
631    }
632
633    /// Get all active edges where entity is source or target.
634    ///
635    /// # Errors
636    ///
637    /// Returns an error if the database query fails.
638    pub async fn edges_for_entity(&self, entity_id: i64) -> Result<Vec<Edge>, MemoryError> {
639        let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
640            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
641                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
642                    edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
643             FROM graph_edges
644             WHERE valid_to IS NULL
645               AND (source_entity_id = ? OR target_entity_id = ?)"
646        ))
647        .bind(entity_id)
648        .bind(entity_id)
649        .fetch_all(&self.pool)
650        .await?;
651        Ok(rows.into_iter().map(edge_from_row).collect())
652    }
653
654    /// Get all edges (active and expired) where entity is source or target, ordered by
655    /// `valid_from DESC`. Used by the `/graph history <name>` slash command.
656    ///
657    /// # Errors
658    ///
659    /// Returns an error if the database query fails or if `limit` overflows `i64`.
660    pub async fn edge_history_for_entity(
661        &self,
662        entity_id: i64,
663        limit: usize,
664    ) -> Result<Vec<Edge>, MemoryError> {
665        let limit = i64::try_from(limit)?;
666        let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
667            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
668                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
669                    edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
670             FROM graph_edges
671             WHERE source_entity_id = ? OR target_entity_id = ?
672             ORDER BY valid_from DESC
673             LIMIT ?"
674        ))
675        .bind(entity_id)
676        .bind(entity_id)
677        .bind(limit)
678        .fetch_all(&self.pool)
679        .await?;
680        Ok(rows.into_iter().map(edge_from_row).collect())
681    }
682
683    /// Get all active edges between two entities (both directions).
684    ///
685    /// # Errors
686    ///
687    /// Returns an error if the database query fails.
688    pub async fn edges_between(
689        &self,
690        entity_a: i64,
691        entity_b: i64,
692    ) -> Result<Vec<Edge>, MemoryError> {
693        let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
694            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
695                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
696                    edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
697             FROM graph_edges
698             WHERE valid_to IS NULL
699               AND ((source_entity_id = ? AND target_entity_id = ?)
700                 OR (source_entity_id = ? AND target_entity_id = ?))"
701        ))
702        .bind(entity_a)
703        .bind(entity_b)
704        .bind(entity_b)
705        .bind(entity_a)
706        .fetch_all(&self.pool)
707        .await?;
708        Ok(rows.into_iter().map(edge_from_row).collect())
709    }
710
711    /// Get active edges from `source` to `target` in the exact direction (no reverse).
712    ///
713    /// # Errors
714    ///
715    /// Returns an error if the database query fails.
716    pub async fn edges_exact(
717        &self,
718        source_entity_id: i64,
719        target_entity_id: i64,
720    ) -> Result<Vec<Edge>, MemoryError> {
721        let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
722            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
723                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
724                    edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
725             FROM graph_edges
726             WHERE valid_to IS NULL
727               AND source_entity_id = ?
728               AND target_entity_id = ?"
729        ))
730        .bind(source_entity_id)
731        .bind(target_entity_id)
732        .fetch_all(&self.pool)
733        .await?;
734        Ok(rows.into_iter().map(edge_from_row).collect())
735    }
736
737    /// Count active (non-invalidated) edges.
738    ///
739    /// # Errors
740    ///
741    /// Returns an error if the database query fails.
742    pub async fn active_edge_count(&self) -> Result<i64, MemoryError> {
743        let count: i64 = zeph_db::query_scalar(sql!(
744            "SELECT COUNT(*) FROM graph_edges WHERE valid_to IS NULL"
745        ))
746        .fetch_one(&self.pool)
747        .await?;
748        Ok(count)
749    }
750
751    /// Return per-type active edge counts as `(edge_type, count)` pairs.
752    ///
753    /// # Errors
754    ///
755    /// Returns an error if the database query fails.
756    pub async fn edge_type_distribution(&self) -> Result<Vec<(String, i64)>, MemoryError> {
757        let rows: Vec<(String, i64)> = zeph_db::query_as(
758            sql!("SELECT edge_type, COUNT(*) FROM graph_edges WHERE valid_to IS NULL GROUP BY edge_type ORDER BY edge_type"),
759        )
760        .fetch_all(&self.pool)
761        .await?;
762        Ok(rows)
763    }
764
765    // ── Communities ───────────────────────────────────────────────────────────
766
767    /// Insert or update a community by name.
768    ///
769    /// `fingerprint` is a BLAKE3 hex string computed from sorted entity IDs and
770    /// intra-community edge IDs. Pass `None` to leave the fingerprint unchanged (e.g. when
771    /// `assign_to_community` adds an entity without a full re-detection pass).
772    ///
773    /// # Errors
774    ///
775    /// Returns an error if the database query fails or JSON serialization fails.
776    pub async fn upsert_community(
777        &self,
778        name: &str,
779        summary: &str,
780        entity_ids: &[i64],
781        fingerprint: Option<&str>,
782    ) -> Result<i64, MemoryError> {
783        let entity_ids_json = serde_json::to_string(entity_ids)?;
784        let id: i64 = zeph_db::query_scalar(sql!(
785            "INSERT INTO graph_communities (name, summary, entity_ids, fingerprint)
786             VALUES (?, ?, ?, ?)
787             ON CONFLICT(name) DO UPDATE SET
788               summary = excluded.summary,
789               entity_ids = excluded.entity_ids,
790               fingerprint = COALESCE(excluded.fingerprint, fingerprint),
791               updated_at = CURRENT_TIMESTAMP
792             RETURNING id"
793        ))
794        .bind(name)
795        .bind(summary)
796        .bind(entity_ids_json)
797        .bind(fingerprint)
798        .fetch_one(&self.pool)
799        .await?;
800        Ok(id)
801    }
802
803    /// Return a map of `fingerprint -> community_id` for all communities with a non-NULL
804    /// fingerprint. Used by `detect_communities` to skip unchanged partitions.
805    ///
806    /// # Errors
807    ///
808    /// Returns an error if the database query fails.
809    pub async fn community_fingerprints(&self) -> Result<HashMap<String, i64>, MemoryError> {
810        let rows: Vec<(String, i64)> = zeph_db::query_as(sql!(
811            "SELECT fingerprint, id FROM graph_communities WHERE fingerprint IS NOT NULL"
812        ))
813        .fetch_all(&self.pool)
814        .await?;
815        Ok(rows.into_iter().collect())
816    }
817
818    /// Delete a single community by its primary key.
819    ///
820    /// # Errors
821    ///
822    /// Returns an error if the database query fails.
823    pub async fn delete_community_by_id(&self, id: i64) -> Result<(), MemoryError> {
824        zeph_db::query(sql!("DELETE FROM graph_communities WHERE id = ?"))
825            .bind(id)
826            .execute(&self.pool)
827            .await?;
828        Ok(())
829    }
830
831    /// Set the fingerprint of a community to `NULL`, invalidating the incremental cache.
832    ///
833    /// Used by `assign_to_community` when an entity is added without a full re-detection pass,
834    /// ensuring the next `detect_communities` run re-summarizes the affected community.
835    ///
836    /// # Errors
837    ///
838    /// Returns an error if the database query fails.
839    pub async fn clear_community_fingerprint(&self, id: i64) -> Result<(), MemoryError> {
840        zeph_db::query(sql!(
841            "UPDATE graph_communities SET fingerprint = NULL WHERE id = ?"
842        ))
843        .bind(id)
844        .execute(&self.pool)
845        .await?;
846        Ok(())
847    }
848
849    /// Find the first community that contains the given `entity_id`.
850    ///
851    /// Uses `json_each()` to push the membership search into `SQLite`, avoiding a full
852    /// table scan with per-row JSON parsing.
853    ///
854    /// # Errors
855    ///
856    /// Returns an error if the database query fails or JSON parsing fails.
857    pub async fn community_for_entity(
858        &self,
859        entity_id: i64,
860    ) -> Result<Option<Community>, MemoryError> {
861        let row: Option<CommunityRow> = zeph_db::query_as(
862            sql!("SELECT c.id, c.name, c.summary, c.entity_ids, c.fingerprint, c.created_at, c.updated_at
863             FROM graph_communities c, json_each(c.entity_ids) j
864             WHERE CAST(j.value AS INTEGER) = ?
865             LIMIT 1"),
866        )
867        .bind(entity_id)
868        .fetch_optional(&self.pool)
869        .await?;
870        match row {
871            Some(row) => {
872                let entity_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
873                Ok(Some(Community {
874                    id: row.id,
875                    name: row.name,
876                    summary: row.summary,
877                    entity_ids,
878                    fingerprint: row.fingerprint,
879                    created_at: row.created_at,
880                    updated_at: row.updated_at,
881                }))
882            }
883            None => Ok(None),
884        }
885    }
886
887    /// Get all communities.
888    ///
889    /// # Errors
890    ///
891    /// Returns an error if the database query fails or JSON parsing fails.
892    pub async fn all_communities(&self) -> Result<Vec<Community>, MemoryError> {
893        let rows: Vec<CommunityRow> = zeph_db::query_as(sql!(
894            "SELECT id, name, summary, entity_ids, fingerprint, created_at, updated_at
895             FROM graph_communities
896             ORDER BY id ASC"
897        ))
898        .fetch_all(&self.pool)
899        .await?;
900
901        rows.into_iter()
902            .map(|row| {
903                let entity_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
904                Ok(Community {
905                    id: row.id,
906                    name: row.name,
907                    summary: row.summary,
908                    entity_ids,
909                    fingerprint: row.fingerprint,
910                    created_at: row.created_at,
911                    updated_at: row.updated_at,
912                })
913            })
914            .collect()
915    }
916
917    /// Count the total number of communities.
918    ///
919    /// # Errors
920    ///
921    /// Returns an error if the database query fails.
922    pub async fn community_count(&self) -> Result<i64, MemoryError> {
923        let count: i64 = zeph_db::query_scalar(sql!("SELECT COUNT(*) FROM graph_communities"))
924            .fetch_one(&self.pool)
925            .await?;
926        Ok(count)
927    }
928
929    // ── Metadata ──────────────────────────────────────────────────────────────
930
931    /// Get a metadata value by key.
932    ///
933    /// # Errors
934    ///
935    /// Returns an error if the database query fails.
936    pub async fn get_metadata(&self, key: &str) -> Result<Option<String>, MemoryError> {
937        let val: Option<String> =
938            zeph_db::query_scalar(sql!("SELECT value FROM graph_metadata WHERE key = ?"))
939                .bind(key)
940                .fetch_optional(&self.pool)
941                .await?;
942        Ok(val)
943    }
944
945    /// Set a metadata value by key (upsert).
946    ///
947    /// # Errors
948    ///
949    /// Returns an error if the database query fails.
950    pub async fn set_metadata(&self, key: &str, value: &str) -> Result<(), MemoryError> {
951        zeph_db::query(sql!(
952            "INSERT INTO graph_metadata (key, value) VALUES (?, ?)
953             ON CONFLICT(key) DO UPDATE SET value = excluded.value"
954        ))
955        .bind(key)
956        .bind(value)
957        .execute(&self.pool)
958        .await?;
959        Ok(())
960    }
961
962    /// Get the current extraction count from metadata.
963    ///
964    /// Returns 0 if the counter has not been initialized.
965    ///
966    /// # Errors
967    ///
968    /// Returns an error if the database query fails.
969    pub async fn extraction_count(&self) -> Result<i64, MemoryError> {
970        let val = self.get_metadata("extraction_count").await?;
971        Ok(val.and_then(|v| v.parse::<i64>().ok()).unwrap_or(0))
972    }
973
974    /// Stream all active (non-invalidated) edges.
975    pub fn all_active_edges_stream(&self) -> impl Stream<Item = Result<Edge, MemoryError>> + '_ {
976        use futures::StreamExt as _;
977        zeph_db::query_as::<_, EdgeRow>(sql!(
978            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
979                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
980                    edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
981             FROM graph_edges
982             WHERE valid_to IS NULL
983             ORDER BY id ASC"
984        ))
985        .fetch(&self.pool)
986        .map(|r| r.map_err(MemoryError::from).map(edge_from_row))
987    }
988
989    /// Fetch a chunk of active edges using keyset pagination.
990    ///
991    /// Returns edges with `id > after_id` in ascending order, up to `limit` rows.
992    /// Starting with `after_id = 0` returns the first chunk. Pass the last `id` from
993    /// the returned chunk as `after_id` for the next page. An empty result means all
994    /// edges have been consumed.
995    ///
996    /// Keyset pagination is O(1) per page (index seek on `id`) vs OFFSET which is O(N).
997    /// It is also stable under concurrent inserts: new edges get monotonically higher IDs
998    /// and will appear in subsequent chunks or after the last chunk, never causing
999    /// duplicates. Concurrent invalidations (setting `valid_to`) may cause a single edge
1000    /// to be skipped, which is acceptable — LPA operates on an eventual-consistency snapshot.
1001    ///
1002    /// # Errors
1003    ///
1004    /// Returns an error if the database query fails.
1005    pub async fn edges_after_id(
1006        &self,
1007        after_id: i64,
1008        limit: i64,
1009    ) -> Result<Vec<Edge>, MemoryError> {
1010        let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
1011            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1012                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1013                    edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
1014             FROM graph_edges
1015             WHERE valid_to IS NULL AND id > ?
1016             ORDER BY id ASC
1017             LIMIT ?"
1018        ))
1019        .bind(after_id)
1020        .bind(limit)
1021        .fetch_all(&self.pool)
1022        .await?;
1023        Ok(rows.into_iter().map(edge_from_row).collect())
1024    }
1025
1026    /// Find a community by its primary key.
1027    ///
1028    /// # Errors
1029    ///
1030    /// Returns an error if the database query fails or JSON parsing fails.
1031    pub async fn find_community_by_id(&self, id: i64) -> Result<Option<Community>, MemoryError> {
1032        let row: Option<CommunityRow> = zeph_db::query_as(sql!(
1033            "SELECT id, name, summary, entity_ids, fingerprint, created_at, updated_at
1034             FROM graph_communities
1035             WHERE id = ?"
1036        ))
1037        .bind(id)
1038        .fetch_optional(&self.pool)
1039        .await?;
1040        match row {
1041            Some(row) => {
1042                let entity_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
1043                Ok(Some(Community {
1044                    id: row.id,
1045                    name: row.name,
1046                    summary: row.summary,
1047                    entity_ids,
1048                    fingerprint: row.fingerprint,
1049                    created_at: row.created_at,
1050                    updated_at: row.updated_at,
1051                }))
1052            }
1053            None => Ok(None),
1054        }
1055    }
1056
1057    /// Delete all communities (full rebuild before upsert).
1058    ///
1059    /// # Errors
1060    ///
1061    /// Returns an error if the database query fails.
1062    pub async fn delete_all_communities(&self) -> Result<(), MemoryError> {
1063        zeph_db::query(sql!("DELETE FROM graph_communities"))
1064            .execute(&self.pool)
1065            .await?;
1066        Ok(())
1067    }
1068
1069    // ── A-MEM Retrieval Tracking ──────────────────────────────────────────────
1070
1071    /// Find entities matching `query` and return them with normalized FTS5 scores.
1072    ///
1073    /// Returns `Vec<(Entity, fts_score)>` where `fts_score` is normalized to `[0.0, 1.0]`
1074    /// by dividing each negated BM25 value by the maximum in the result set.
1075    /// Alias matches receive a fixed score of `0.5` (relative to FTS matches before normalization).
1076    ///
1077    /// Uses `UNION ALL` with outer `ORDER BY` to preserve FTS5 ordering through the LIMIT.
1078    ///
1079    /// # Errors
1080    ///
1081    /// Returns an error if the database query fails.
1082    pub async fn find_entities_ranked(
1083        &self,
1084        query: &str,
1085        limit: usize,
1086    ) -> Result<Vec<(Entity, f32)>, MemoryError> {
1087        let query = &query[..query.floor_char_boundary(512)];
1088        let Some(fts_query) = build_fts_query(query) else {
1089            return Ok(vec![]);
1090        };
1091
1092        let limit_i64 = i64::try_from(limit)?;
1093        let ranked_fts_sql = build_ranked_fts_sql();
1094        let rows: Vec<EntityFtsRow> = zeph_db::query_as(&ranked_fts_sql)
1095            .bind(&fts_query)
1096            .bind(format!(
1097                "%{}%",
1098                query
1099                    .trim()
1100                    .replace('\\', "\\\\")
1101                    .replace('%', "\\%")
1102                    .replace('_', "\\_")
1103            ))
1104            .bind(limit_i64)
1105            .fetch_all(&self.pool)
1106            .await?;
1107
1108        if rows.is_empty() {
1109            return Ok(vec![]);
1110        }
1111
1112        Ok(normalize_and_dedup(rows))
1113    }
1114
1115    /// Compute structural scores (degree + edge type diversity) for a batch of entity IDs.
1116    ///
1117    /// Returns `HashMap<entity_id, structural_score>` where score is in `[0.0, 1.0]`.
1118    /// Formula: `0.6 * (degree / max_degree) + 0.4 * (type_diversity / 4.0)`.
1119    /// Entities with no edges receive score `0.0`.
1120    ///
1121    /// # Errors
1122    ///
1123    /// Returns an error if the database query fails.
1124    pub async fn entity_structural_scores(
1125        &self,
1126        entity_ids: &[i64],
1127    ) -> Result<HashMap<i64, f32>, MemoryError> {
1128        // Each query binds entity_ids three times (three IN clauses).
1129        // Stay safely under SQLite 999-variable limit: 999 / 3 = 333, use 163 for headroom.
1130        const MAX_BATCH: usize = 163;
1131
1132        if entity_ids.is_empty() {
1133            return Ok(HashMap::new());
1134        }
1135
1136        let mut all_rows: Vec<(i64, i64, i64)> = Vec::new();
1137        for chunk in entity_ids.chunks(MAX_BATCH) {
1138            let n = chunk.len();
1139            // Three copies of chunk IDs: positions 1..n, n+1..2n, 2n+1..3n
1140            let ph1 = placeholder_list(1, n);
1141            let ph2 = placeholder_list(n + 1, n);
1142            let ph3 = placeholder_list(n * 2 + 1, n);
1143
1144            // Build query: count degree and distinct edge types for each entity.
1145            let sql = format!(
1146                "SELECT entity_id,
1147                        COUNT(*) AS degree,
1148                        COUNT(DISTINCT edge_type) AS type_diversity
1149                 FROM (
1150                     SELECT source_entity_id AS entity_id, edge_type
1151                     FROM graph_edges
1152                     WHERE valid_to IS NULL AND source_entity_id IN ({ph1})
1153                     UNION ALL
1154                     SELECT target_entity_id AS entity_id, edge_type
1155                     FROM graph_edges
1156                     WHERE valid_to IS NULL AND target_entity_id IN ({ph2})
1157                 )
1158                 WHERE entity_id IN ({ph3})
1159                 GROUP BY entity_id"
1160            );
1161
1162            let mut query = zeph_db::query_as::<_, (i64, i64, i64)>(&sql);
1163            // Bind chunk three times (three IN clauses)
1164            for id in chunk {
1165                query = query.bind(*id);
1166            }
1167            for id in chunk {
1168                query = query.bind(*id);
1169            }
1170            for id in chunk {
1171                query = query.bind(*id);
1172            }
1173
1174            let chunk_rows: Vec<(i64, i64, i64)> = query.fetch_all(&self.pool).await?;
1175            all_rows.extend(chunk_rows);
1176        }
1177
1178        if all_rows.is_empty() {
1179            return Ok(entity_ids.iter().map(|&id| (id, 0.0_f32)).collect());
1180        }
1181
1182        let max_degree = all_rows
1183            .iter()
1184            .map(|(_, d, _)| *d)
1185            .max()
1186            .unwrap_or(1)
1187            .max(1);
1188
1189        let mut scores: HashMap<i64, f32> = entity_ids.iter().map(|&id| (id, 0.0_f32)).collect();
1190        for (entity_id, degree, type_diversity) in all_rows {
1191            #[allow(clippy::cast_precision_loss)]
1192            let norm_degree = degree as f32 / max_degree as f32;
1193            #[allow(clippy::cast_precision_loss)]
1194            let norm_diversity = (type_diversity as f32 / 4.0).min(1.0);
1195            let score = 0.6 * norm_degree + 0.4 * norm_diversity;
1196            scores.insert(entity_id, score);
1197        }
1198
1199        Ok(scores)
1200    }
1201
1202    /// Look up community IDs for a batch of entity IDs.
1203    ///
1204    /// Returns `HashMap<entity_id, community_id>`. Entities not assigned to any community
1205    /// are absent from the map (treated as `None` by callers — no community cap applied).
1206    ///
1207    /// # Errors
1208    ///
1209    /// Returns an error if the database query fails.
1210    pub async fn entity_community_ids(
1211        &self,
1212        entity_ids: &[i64],
1213    ) -> Result<HashMap<i64, i64>, MemoryError> {
1214        const MAX_BATCH: usize = 490;
1215
1216        if entity_ids.is_empty() {
1217            return Ok(HashMap::new());
1218        }
1219
1220        let mut result: HashMap<i64, i64> = HashMap::new();
1221        for chunk in entity_ids.chunks(MAX_BATCH) {
1222            let placeholders = placeholder_list(1, chunk.len());
1223
1224            let community_sql = community_ids_sql(&placeholders);
1225            let mut query = zeph_db::query_as::<_, (i64, i64)>(&community_sql);
1226            for id in chunk {
1227                query = query.bind(*id);
1228            }
1229
1230            let rows: Vec<(i64, i64)> = query.fetch_all(&self.pool).await?;
1231            result.extend(rows);
1232        }
1233
1234        Ok(result)
1235    }
1236
1237    /// Increment `retrieval_count` and set `last_retrieved_at` for a batch of edge IDs.
1238    ///
1239    /// Fire-and-forget: errors are logged but not propagated. Caller should log the warning.
1240    /// Batched with `MAX_BATCH = 490` to stay safely under `SQLite` bind variable limit.
1241    ///
1242    /// # Errors
1243    ///
1244    /// Returns an error if the database query fails.
1245    pub async fn record_edge_retrieval(&self, edge_ids: &[i64]) -> Result<(), MemoryError> {
1246        const MAX_BATCH: usize = 490;
1247        let epoch_now = <ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1248        for chunk in edge_ids.chunks(MAX_BATCH) {
1249            let edge_placeholders = placeholder_list(1, chunk.len());
1250            let retrieval_sql = format!(
1251                "UPDATE graph_edges \
1252                 SET retrieval_count = retrieval_count + 1, \
1253                     last_retrieved_at = {epoch_now} \
1254                 WHERE id IN ({edge_placeholders})"
1255            );
1256            let mut q = zeph_db::query(&retrieval_sql);
1257            for id in chunk {
1258                q = q.bind(*id);
1259            }
1260            q.execute(&self.pool).await?;
1261        }
1262        Ok(())
1263    }
1264
1265    /// Increment `weight` on the set of edges traversed during the current recall (HL-F2, #3344).
1266    ///
1267    /// Mirrors [`Self::record_edge_retrieval`] in shape: same `MAX_BATCH = 490` chunking,
1268    /// same `WHERE id IN (…) AND valid_to IS NULL` filter (defensive — traversed edges should
1269    /// already be active, but this prevents reinforcing tombstoned edges).
1270    ///
1271    /// No-op when `edge_ids` is empty or `delta == 0.0`.
1272    ///
1273    /// # Errors
1274    ///
1275    /// Returns an error if the underlying `UPDATE` fails.
1276    #[tracing::instrument(
1277        name = "memory.graph.hebbian_increment",
1278        skip_all,
1279        fields(edge_count = edge_ids.len())
1280    )]
1281    pub async fn apply_hebbian_increment(
1282        &self,
1283        edge_ids: &[i64],
1284        delta: f32,
1285    ) -> Result<(), MemoryError> {
1286        // MAX_BATCH chosen to stay under SQLite's 999 host-parameter limit
1287        // ($1 = delta, $2..$N+1 = edge ids — 1 + 490 = 491 params per chunk).
1288        const MAX_BATCH: usize = 490;
1289        if edge_ids.is_empty() || delta == 0.0 {
1290            return Ok(());
1291        }
1292        for chunk in edge_ids.chunks(MAX_BATCH) {
1293            let edge_placeholders = placeholder_list(2, chunk.len());
1294            let sql = format!(
1295                "UPDATE graph_edges \
1296                 SET weight = weight + $1 \
1297                 WHERE id IN ({edge_placeholders}) \
1298                   AND valid_to IS NULL"
1299            );
1300            let mut q = zeph_db::query(&sql);
1301            q = q.bind(f64::from(delta));
1302            for id in chunk {
1303                q = q.bind(*id);
1304            }
1305            q.execute(&self.pool).await?;
1306        }
1307        Ok(())
1308    }
1309
1310    /// Return the subset of `ids` that exist in `graph_entities`.
1311    ///
1312    /// Useful for cross-referencing Qdrant-side entity IDs against the `SQLite` truth.
1313    /// Processes in chunks of 490 to stay under the `SQLite` variable limit (~32 k).
1314    ///
1315    /// # Errors
1316    ///
1317    /// Returns an error if the database query fails.
1318    pub async fn entity_ids_in(&self, ids: &[i64]) -> Result<Vec<i64>, MemoryError> {
1319        const MAX_BATCH: usize = 490;
1320        if ids.is_empty() {
1321            return Ok(Vec::new());
1322        }
1323        // TODO: chunk when ids.len() > 5_000 to guard against extreme cases
1324        let mut result = Vec::with_capacity(ids.len());
1325        for chunk in ids.chunks(MAX_BATCH) {
1326            let placeholders = zeph_db::placeholder_list(1, chunk.len());
1327            let sql = format!("SELECT id FROM graph_entities WHERE id IN ({placeholders})");
1328            let mut q = zeph_db::query_as::<_, (i64,)>(&sql);
1329            for id in chunk {
1330                q = q.bind(*id);
1331            }
1332            let rows = q.fetch_all(&self.pool).await?;
1333            for (id,) in rows {
1334                result.push(id);
1335            }
1336        }
1337        Ok(result)
1338    }
1339
1340    /// Resolve entity IDs to their Qdrant point IDs in a single batched `SELECT` (HL-F5, #3346).
1341    ///
1342    /// Entities without a `qdrant_point_id` are silently omitted from the result.
1343    ///
1344    /// # Errors
1345    ///
1346    /// Returns an error if the database query fails.
1347    pub async fn qdrant_point_ids_for_entities(
1348        &self,
1349        entity_ids: &[i64],
1350    ) -> Result<HashMap<i64, String>, MemoryError> {
1351        // Chunk to stay under SQLite variable limit.
1352        const MAX_BATCH: usize = 490;
1353        if entity_ids.is_empty() {
1354            return Ok(HashMap::new());
1355        }
1356        let mut result: HashMap<i64, String> = HashMap::with_capacity(entity_ids.len());
1357        for chunk in entity_ids.chunks(MAX_BATCH) {
1358            let placeholders = zeph_db::placeholder_list(1, chunk.len());
1359            let sql = format!(
1360                "SELECT id, qdrant_point_id \
1361                 FROM graph_entities \
1362                 WHERE id IN ({placeholders}) \
1363                   AND qdrant_point_id IS NOT NULL"
1364            );
1365            let mut q = zeph_db::query_as::<_, (i64, String)>(&sql);
1366            for id in chunk {
1367                q = q.bind(*id);
1368            }
1369            let rows = q.fetch_all(&self.pool).await?;
1370            for (entity_id, point_id) in rows {
1371                result.insert(entity_id, point_id);
1372            }
1373        }
1374        Ok(result)
1375    }
1376
1377    /// Apply multiplicative decay to `retrieval_count` for un-retrieved active edges.
1378    ///
1379    /// Only edges with `retrieval_count > 0` and `last_retrieved_at < (now - interval_secs)`
1380    /// are updated. Returns the number of rows affected.
1381    ///
1382    /// # Errors
1383    ///
1384    /// Returns an error if the database query fails.
1385    pub async fn decay_edge_retrieval_counts(
1386        &self,
1387        decay_lambda: f64,
1388        interval_secs: u64,
1389    ) -> Result<usize, MemoryError> {
1390        let epoch_now_decay = <ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1391        let decay_raw = format!(
1392            "UPDATE graph_edges \
1393             SET retrieval_count = MAX(CAST(retrieval_count * ? AS INTEGER), 0) \
1394             WHERE valid_to IS NULL \
1395               AND retrieval_count > 0 \
1396               AND (last_retrieved_at IS NULL OR last_retrieved_at < {epoch_now_decay} - ?)"
1397        );
1398        let decay_sql = zeph_db::rewrite_placeholders(&decay_raw);
1399        let result = zeph_db::query(&decay_sql)
1400            .bind(decay_lambda)
1401            .bind(i64::try_from(interval_secs).unwrap_or(i64::MAX))
1402            .execute(&self.pool)
1403            .await?;
1404        Ok(usize::try_from(result.rows_affected())?)
1405    }
1406
1407    /// Delete expired edges older than `retention_days` and return count deleted.
1408    ///
1409    /// # Errors
1410    ///
1411    /// Returns an error if the database query fails.
1412    pub async fn delete_expired_edges(&self, retention_days: u32) -> Result<usize, MemoryError> {
1413        let days = i64::from(retention_days);
1414        let result = zeph_db::query(sql!(
1415            "DELETE FROM graph_edges
1416             WHERE expired_at IS NOT NULL
1417               AND expired_at < datetime('now', '-' || ? || ' days')"
1418        ))
1419        .bind(days)
1420        .execute(&self.pool)
1421        .await?;
1422        Ok(usize::try_from(result.rows_affected())?)
1423    }
1424
1425    /// Delete orphan entities (no active edges, last seen more than `retention_days` ago).
1426    ///
1427    /// # Errors
1428    ///
1429    /// Returns an error if the database query fails.
1430    pub async fn delete_orphan_entities(&self, retention_days: u32) -> Result<usize, MemoryError> {
1431        let days = i64::from(retention_days);
1432        let result = zeph_db::query(sql!(
1433            "DELETE FROM graph_entities
1434             WHERE id NOT IN (
1435                 SELECT DISTINCT source_entity_id FROM graph_edges WHERE valid_to IS NULL
1436                 UNION
1437                 SELECT DISTINCT target_entity_id FROM graph_edges WHERE valid_to IS NULL
1438             )
1439             AND last_seen_at < datetime('now', '-' || ? || ' days')"
1440        ))
1441        .bind(days)
1442        .execute(&self.pool)
1443        .await?;
1444        Ok(usize::try_from(result.rows_affected())?)
1445    }
1446
1447    /// Delete the oldest excess entities when count exceeds `max_entities`.
1448    ///
1449    /// Entities are ranked by ascending edge count, then ascending `last_seen_at` (LRU).
1450    /// Only deletes when `entity_count() > max_entities`.
1451    ///
1452    /// # Errors
1453    ///
1454    /// Returns an error if the database query fails.
1455    pub async fn cap_entities(&self, max_entities: usize) -> Result<usize, MemoryError> {
1456        let current = self.entity_count().await?;
1457        let max = i64::try_from(max_entities)?;
1458        if current <= max {
1459            return Ok(0);
1460        }
1461        let excess = current - max;
1462        let result = zeph_db::query(sql!(
1463            "DELETE FROM graph_entities
1464             WHERE id IN (
1465                 SELECT e.id
1466                 FROM graph_entities e
1467                 LEFT JOIN (
1468                     SELECT source_entity_id AS eid, COUNT(*) AS cnt
1469                     FROM graph_edges WHERE valid_to IS NULL GROUP BY source_entity_id
1470                     UNION ALL
1471                     SELECT target_entity_id AS eid, COUNT(*) AS cnt
1472                     FROM graph_edges WHERE valid_to IS NULL GROUP BY target_entity_id
1473                 ) edge_counts ON e.id = edge_counts.eid
1474                 ORDER BY COALESCE(edge_counts.cnt, 0) ASC, e.last_seen_at ASC
1475                 LIMIT ?
1476             )"
1477        ))
1478        .bind(excess)
1479        .execute(&self.pool)
1480        .await?;
1481        Ok(usize::try_from(result.rows_affected())?)
1482    }
1483
1484    // ── Temporal Edge Queries ─────────────────────────────────────────────────
1485
1486    /// Return all edges for `entity_id` (as source or target) that were valid at `timestamp`.
1487    ///
1488    /// An edge is valid at `timestamp` when:
1489    /// - `valid_from <= timestamp`, AND
1490    /// - `valid_to IS NULL` (open-ended) OR `valid_to > timestamp`.
1491    ///
1492    /// `timestamp` must be a `SQLite` datetime string: `"YYYY-MM-DD HH:MM:SS"`.
1493    ///
1494    /// # Errors
1495    ///
1496    /// Returns an error if the database query fails.
1497    pub async fn edges_at_timestamp(
1498        &self,
1499        entity_id: i64,
1500        timestamp: &str,
1501    ) -> Result<Vec<Edge>, MemoryError> {
1502        // Split into two UNIONed branches to leverage the partial indexes from migration 030:
1503        //   Branch 1 (active edges):     idx_graph_edges_valid + idx_graph_edges_source/target
1504        //   Branch 2 (historical edges): idx_graph_edges_src_temporal / idx_graph_edges_tgt_temporal
1505        let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
1506            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1507                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1508                    edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
1509             FROM graph_edges
1510             WHERE valid_to IS NULL
1511               AND valid_from <= ?
1512               AND (source_entity_id = ? OR target_entity_id = ?)
1513             UNION ALL
1514             SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1515                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1516                    edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
1517             FROM graph_edges
1518             WHERE valid_to IS NOT NULL
1519               AND valid_from <= ?
1520               AND valid_to > ?
1521               AND (source_entity_id = ? OR target_entity_id = ?)"
1522        ))
1523        .bind(timestamp)
1524        .bind(entity_id)
1525        .bind(entity_id)
1526        .bind(timestamp)
1527        .bind(timestamp)
1528        .bind(entity_id)
1529        .bind(entity_id)
1530        .fetch_all(&self.pool)
1531        .await?;
1532        Ok(rows.into_iter().map(edge_from_row).collect())
1533    }
1534
1535    /// Return all edge versions (active and expired) for the given `(source, predicate)` pair.
1536    ///
1537    /// The optional `relation` filter restricts results to a specific relation label.
1538    /// Results are ordered by `valid_from DESC` (most recent first).
1539    ///
1540    /// # Errors
1541    ///
1542    /// Returns an error if the database query fails.
1543    pub async fn edge_history(
1544        &self,
1545        source_entity_id: i64,
1546        predicate: &str,
1547        relation: Option<&str>,
1548        limit: usize,
1549    ) -> Result<Vec<Edge>, MemoryError> {
1550        // Escape LIKE wildcards so `%` and `_` in the predicate are treated as literals.
1551        let escaped = predicate
1552            .replace('\\', "\\\\")
1553            .replace('%', "\\%")
1554            .replace('_', "\\_");
1555        let like_pattern = format!("%{escaped}%");
1556        let limit = i64::try_from(limit)?;
1557        let rows: Vec<EdgeRow> = if let Some(rel) = relation {
1558            zeph_db::query_as(sql!(
1559                "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1560                        valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1561                        edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
1562                 FROM graph_edges
1563                 WHERE source_entity_id = ?
1564                   AND fact LIKE ? ESCAPE '\\'
1565                   AND relation = ?
1566                 ORDER BY valid_from DESC
1567                 LIMIT ?"
1568            ))
1569            .bind(source_entity_id)
1570            .bind(&like_pattern)
1571            .bind(rel)
1572            .bind(limit)
1573            .fetch_all(&self.pool)
1574            .await?
1575        } else {
1576            zeph_db::query_as(sql!(
1577                "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1578                        valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1579                        edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
1580                 FROM graph_edges
1581                 WHERE source_entity_id = ?
1582                   AND fact LIKE ? ESCAPE '\\'
1583                 ORDER BY valid_from DESC
1584                 LIMIT ?"
1585            ))
1586            .bind(source_entity_id)
1587            .bind(&like_pattern)
1588            .bind(limit)
1589            .fetch_all(&self.pool)
1590            .await?
1591        };
1592        Ok(rows.into_iter().map(edge_from_row).collect())
1593    }
1594
1595    // ── BFS Traversal ─────────────────────────────────────────────────────────
1596
1597    /// Breadth-first traversal from `start_entity_id` up to `max_hops` hops.
1598    ///
1599    /// Returns all reachable entities and the active edges connecting them.
1600    /// Implements BFS iteratively in Rust to guarantee cycle safety regardless
1601    /// of `SQLite` CTE limitations.
1602    ///
1603    /// **`SQLite` bind parameter limit**: each BFS hop binds the frontier IDs three times in the
1604    /// neighbour query. At ~300+ frontier entities per hop, the IN clause may approach `SQLite`'s
1605    /// default `SQLITE_MAX_VARIABLE_NUMBER` limit of 999. Acceptable for Phase 1 (small graphs,
1606    /// `max_hops` typically 2–3). For large graphs, consider batching or a temp-table approach.
1607    ///
1608    /// # Errors
1609    ///
1610    /// Returns an error if any database query fails.
1611    pub async fn bfs(
1612        &self,
1613        start_entity_id: i64,
1614        max_hops: u32,
1615    ) -> Result<(Vec<Entity>, Vec<Edge>), MemoryError> {
1616        self.bfs_with_depth(start_entity_id, max_hops)
1617            .await
1618            .map(|(e, ed, _)| (e, ed))
1619    }
1620
1621    /// BFS traversal returning entities, edges, and a depth map (`entity_id` → hop distance).
1622    ///
1623    /// The depth map records the minimum hop distance from `start_entity_id` to each visited
1624    /// entity. The start entity itself has depth 0.
1625    ///
1626    /// **`SQLite` bind parameter limit**: see [`Self::bfs`] for notes on frontier size limits.
1627    ///
1628    /// # Errors
1629    ///
1630    /// Returns an error if any database query fails.
1631    pub async fn bfs_with_depth(
1632        &self,
1633        start_entity_id: i64,
1634        max_hops: u32,
1635    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1636        self.bfs_core(start_entity_id, max_hops, None).await
1637    }
1638
1639    /// BFS traversal considering only edges that were valid at `timestamp`.
1640    ///
1641    /// Equivalent to [`Self::bfs_with_depth`] but replaces the `valid_to IS NULL` filter with
1642    /// the temporal range predicate `valid_from <= ts AND (valid_to IS NULL OR valid_to > ts)`.
1643    ///
1644    /// `timestamp` must be a `SQLite` datetime string: `"YYYY-MM-DD HH:MM:SS"`.
1645    ///
1646    /// # Errors
1647    ///
1648    /// Returns an error if any database query fails.
1649    pub async fn bfs_at_timestamp(
1650        &self,
1651        start_entity_id: i64,
1652        max_hops: u32,
1653        timestamp: &str,
1654    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1655        self.bfs_core(start_entity_id, max_hops, Some(timestamp))
1656            .await
1657    }
1658
1659    /// BFS traversal scoped to specific MAGMA edge types.
1660    ///
1661    /// When `edge_types` is empty, behaves identically to [`Self::bfs_with_depth`] (traverses all
1662    /// active edges). When `edge_types` is non-empty, only traverses edges whose `edge_type`
1663    /// matches one of the provided types.
1664    ///
1665    /// This enables subgraph-scoped retrieval: a causal query traverses only causal + semantic
1666    /// edges, a temporal query only temporal + semantic edges, etc.
1667    ///
1668    /// Note: Semantic is typically included in `edge_types` by the caller to ensure recall is
1669    /// never worse than the untyped BFS. See `classify_graph_subgraph` in `router.rs`.
1670    ///
1671    /// # Errors
1672    ///
1673    /// Returns an error if any database query fails.
1674    pub async fn bfs_typed(
1675        &self,
1676        start_entity_id: i64,
1677        max_hops: u32,
1678        edge_types: &[EdgeType],
1679    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1680        if edge_types.is_empty() {
1681            return self.bfs_with_depth(start_entity_id, max_hops).await;
1682        }
1683        self.bfs_core_typed(start_entity_id, max_hops, None, edge_types)
1684            .await
1685    }
1686
1687    /// Shared BFS implementation.
1688    ///
1689    /// When `at_timestamp` is `None`, only active edges (`valid_to IS NULL`) are traversed.
1690    /// When `at_timestamp` is `Some(ts)`, edges valid at `ts` are traversed (temporal BFS).
1691    ///
1692    /// All IDs used in dynamic SQL come from our own database — no user input reaches the
1693    /// format string, so there is no SQL injection risk.
1694    async fn bfs_core(
1695        &self,
1696        start_entity_id: i64,
1697        max_hops: u32,
1698        at_timestamp: Option<&str>,
1699    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1700        use std::collections::HashMap;
1701
1702        // SQLite binds frontier IDs 3× per hop; at >333 IDs the IN clause exceeds
1703        // SQLITE_MAX_VARIABLE_NUMBER (999). Cap to 300 to stay safely within the limit.
1704        const MAX_FRONTIER: usize = 300;
1705
1706        let mut depth_map: HashMap<i64, u32> = HashMap::new();
1707        let mut frontier: Vec<i64> = vec![start_entity_id];
1708        depth_map.insert(start_entity_id, 0);
1709
1710        for hop in 0..max_hops {
1711            if frontier.is_empty() {
1712                break;
1713            }
1714            frontier.truncate(MAX_FRONTIER);
1715            // IDs come from our own DB — no user input, no injection risk.
1716            // Three copies of frontier IDs: positions 1..n, n+1..2n, 2n+1..3n.
1717            // Timestamp (if any) follows at position 3n+1.
1718            let n = frontier.len();
1719            let ph1 = placeholder_list(1, n);
1720            let ph2 = placeholder_list(n + 1, n);
1721            let ph3 = placeholder_list(n * 2 + 1, n);
1722            let edge_filter = if at_timestamp.is_some() {
1723                let ts_pos = n * 3 + 1;
1724                format!(
1725                    "valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1726                    ts = numbered_placeholder(ts_pos),
1727                )
1728            } else {
1729                "valid_to IS NULL".to_owned()
1730            };
1731            let neighbour_sql = format!(
1732                "SELECT DISTINCT CASE
1733                     WHEN source_entity_id IN ({ph1}) THEN target_entity_id
1734                     ELSE source_entity_id
1735                 END as neighbour_id
1736                 FROM graph_edges
1737                 WHERE {edge_filter}
1738                   AND (source_entity_id IN ({ph2}) OR target_entity_id IN ({ph3}))"
1739            );
1740            let mut q = zeph_db::query_scalar::<_, i64>(&neighbour_sql);
1741            for id in &frontier {
1742                q = q.bind(*id);
1743            }
1744            for id in &frontier {
1745                q = q.bind(*id);
1746            }
1747            for id in &frontier {
1748                q = q.bind(*id);
1749            }
1750            if let Some(ts) = at_timestamp {
1751                q = q.bind(ts);
1752            }
1753            let neighbours: Vec<i64> = q.fetch_all(&self.pool).await?;
1754            let mut next_frontier: Vec<i64> = Vec::new();
1755            for nbr in neighbours {
1756                if let std::collections::hash_map::Entry::Vacant(e) = depth_map.entry(nbr) {
1757                    e.insert(hop + 1);
1758                    next_frontier.push(nbr);
1759                }
1760            }
1761            frontier = next_frontier;
1762        }
1763
1764        self.bfs_fetch_results(depth_map, at_timestamp).await
1765    }
1766
1767    /// BFS implementation scoped to specific edge types.
1768    ///
1769    /// Builds the IN clause for `edge_type` filtering dynamically from enum values.
1770    /// All enum-derived strings come from `EdgeType::as_str()` — no user input reaches SQL.
1771    ///
1772    /// # Errors
1773    ///
1774    /// Returns an error if any database query fails.
1775    async fn bfs_core_typed(
1776        &self,
1777        start_entity_id: i64,
1778        max_hops: u32,
1779        at_timestamp: Option<&str>,
1780        edge_types: &[EdgeType],
1781    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1782        use std::collections::HashMap;
1783
1784        const MAX_FRONTIER: usize = 300;
1785
1786        let type_strs: Vec<&str> = edge_types.iter().map(|t| t.as_str()).collect();
1787
1788        let mut depth_map: HashMap<i64, u32> = HashMap::new();
1789        let mut frontier: Vec<i64> = vec![start_entity_id];
1790        depth_map.insert(start_entity_id, 0);
1791
1792        let n_types = type_strs.len();
1793        // type_in is constant for the entire BFS — positions 1..=n_types never change.
1794        let type_in = placeholder_list(1, n_types);
1795        let id_start = n_types + 1;
1796
1797        for hop in 0..max_hops {
1798            if frontier.is_empty() {
1799                break;
1800            }
1801            frontier.truncate(MAX_FRONTIER);
1802
1803            let n_frontier = frontier.len();
1804            // Positions: types first (1..n_types), then 3 copies of frontier IDs.
1805            let fp1 = placeholder_list(id_start, n_frontier);
1806            let fp2 = placeholder_list(id_start + n_frontier, n_frontier);
1807            let fp3 = placeholder_list(id_start + n_frontier * 2, n_frontier);
1808
1809            let edge_filter = if at_timestamp.is_some() {
1810                let ts_pos = id_start + n_frontier * 3;
1811                format!(
1812                    "edge_type IN ({type_in}) AND valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1813                    ts = numbered_placeholder(ts_pos),
1814                )
1815            } else {
1816                format!("edge_type IN ({type_in}) AND valid_to IS NULL")
1817            };
1818
1819            let neighbour_sql = format!(
1820                "SELECT DISTINCT CASE
1821                     WHEN source_entity_id IN ({fp1}) THEN target_entity_id
1822                     ELSE source_entity_id
1823                 END as neighbour_id
1824                 FROM graph_edges
1825                 WHERE {edge_filter}
1826                   AND (source_entity_id IN ({fp2}) OR target_entity_id IN ({fp3}))"
1827            );
1828
1829            let mut q = zeph_db::query_scalar::<_, i64>(&neighbour_sql);
1830            // Bind types first
1831            for t in &type_strs {
1832                q = q.bind(*t);
1833            }
1834            // Bind frontier 3 times
1835            for id in &frontier {
1836                q = q.bind(*id);
1837            }
1838            for id in &frontier {
1839                q = q.bind(*id);
1840            }
1841            for id in &frontier {
1842                q = q.bind(*id);
1843            }
1844            if let Some(ts) = at_timestamp {
1845                q = q.bind(ts);
1846            }
1847
1848            let neighbours: Vec<i64> = q.fetch_all(&self.pool).await?;
1849            let mut next_frontier: Vec<i64> = Vec::new();
1850            for nbr in neighbours {
1851                if let std::collections::hash_map::Entry::Vacant(e) = depth_map.entry(nbr) {
1852                    e.insert(hop + 1);
1853                    next_frontier.push(nbr);
1854                }
1855            }
1856            frontier = next_frontier;
1857        }
1858
1859        // Fetch results — pass edge_type filter to bfs_fetch_results_typed
1860        self.bfs_fetch_results_typed(depth_map, at_timestamp, &type_strs)
1861            .await
1862    }
1863
1864    /// Fetch entities and typed edges for a completed BFS depth map.
1865    ///
1866    /// Filters returned edges by the provided `edge_type` strings.
1867    ///
1868    /// # Errors
1869    ///
1870    /// Returns an error if any database query fails.
1871    async fn bfs_fetch_results_typed(
1872        &self,
1873        depth_map: std::collections::HashMap<i64, u32>,
1874        at_timestamp: Option<&str>,
1875        type_strs: &[&str],
1876    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1877        let mut visited_ids: Vec<i64> = depth_map.keys().copied().collect();
1878        if visited_ids.is_empty() {
1879            return Ok((Vec::new(), Vec::new(), depth_map));
1880        }
1881        if visited_ids.len() > 499 {
1882            tracing::warn!(
1883                total = visited_ids.len(),
1884                retained = 499,
1885                "bfs_fetch_results_typed: visited entity set truncated to 499"
1886            );
1887            visited_ids.truncate(499);
1888        }
1889
1890        let n_types = type_strs.len();
1891        let n_visited = visited_ids.len();
1892
1893        // Bind order: types first (1..=n_types), then visited_ids twice, then optional timestamp.
1894        let type_in = placeholder_list(1, n_types);
1895        let id_start = n_types + 1;
1896        let ph_ids1 = placeholder_list(id_start, n_visited);
1897        let ph_ids2 = placeholder_list(id_start + n_visited, n_visited);
1898
1899        let edge_filter = if at_timestamp.is_some() {
1900            let ts_pos = id_start + n_visited * 2;
1901            format!(
1902                "edge_type IN ({type_in}) AND valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1903                ts = numbered_placeholder(ts_pos),
1904            )
1905        } else {
1906            format!("edge_type IN ({type_in}) AND valid_to IS NULL")
1907        };
1908
1909        let edge_sql = format!(
1910            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1911                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1912                    edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
1913             FROM graph_edges
1914             WHERE {edge_filter}
1915               AND source_entity_id IN ({ph_ids1})
1916               AND target_entity_id IN ({ph_ids2})"
1917        );
1918        let mut edge_query = zeph_db::query_as::<_, EdgeRow>(&edge_sql);
1919        for t in type_strs {
1920            edge_query = edge_query.bind(*t);
1921        }
1922        for id in &visited_ids {
1923            edge_query = edge_query.bind(*id);
1924        }
1925        for id in &visited_ids {
1926            edge_query = edge_query.bind(*id);
1927        }
1928        if let Some(ts) = at_timestamp {
1929            edge_query = edge_query.bind(ts);
1930        }
1931        let edge_rows: Vec<EdgeRow> = edge_query.fetch_all(&self.pool).await?;
1932
1933        // For entity query, use plain sequential bind positions (no type prefix offset)
1934        let entity_sql2 = format!(
1935            "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
1936             FROM graph_entities WHERE id IN ({ph})",
1937            ph = placeholder_list(1, visited_ids.len()),
1938        );
1939        let mut entity_query = zeph_db::query_as::<_, EntityRow>(&entity_sql2);
1940        for id in &visited_ids {
1941            entity_query = entity_query.bind(*id);
1942        }
1943        let entity_rows: Vec<EntityRow> = entity_query.fetch_all(&self.pool).await?;
1944
1945        let entities: Vec<Entity> = entity_rows
1946            .into_iter()
1947            .map(entity_from_row)
1948            .collect::<Result<Vec<_>, _>>()?;
1949        let edges: Vec<Edge> = edge_rows.into_iter().map(edge_from_row).collect();
1950
1951        Ok((entities, edges, depth_map))
1952    }
1953
1954    /// Fetch entities and edges for a completed BFS depth map.
1955    async fn bfs_fetch_results(
1956        &self,
1957        depth_map: std::collections::HashMap<i64, u32>,
1958        at_timestamp: Option<&str>,
1959    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1960        let mut visited_ids: Vec<i64> = depth_map.keys().copied().collect();
1961        if visited_ids.is_empty() {
1962            return Ok((Vec::new(), Vec::new(), depth_map));
1963        }
1964        // Edge query binds visited_ids twice — cap at 499 to stay under SQLite 999 limit.
1965        if visited_ids.len() > 499 {
1966            tracing::warn!(
1967                total = visited_ids.len(),
1968                retained = 499,
1969                "bfs_fetch_results: visited entity set truncated to 499 to stay within SQLite bind limit; \
1970                 some reachable entities will be dropped from results"
1971            );
1972            visited_ids.truncate(499);
1973        }
1974
1975        let n = visited_ids.len();
1976        let ph_ids1 = placeholder_list(1, n);
1977        let ph_ids2 = placeholder_list(n + 1, n);
1978        let edge_filter = if at_timestamp.is_some() {
1979            let ts_pos = n * 2 + 1;
1980            format!(
1981                "valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1982                ts = numbered_placeholder(ts_pos),
1983            )
1984        } else {
1985            "valid_to IS NULL".to_owned()
1986        };
1987        let edge_sql = format!(
1988            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1989                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1990                    edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
1991             FROM graph_edges
1992             WHERE {edge_filter}
1993               AND source_entity_id IN ({ph_ids1})
1994               AND target_entity_id IN ({ph_ids2})"
1995        );
1996        let mut edge_query = zeph_db::query_as::<_, EdgeRow>(&edge_sql);
1997        for id in &visited_ids {
1998            edge_query = edge_query.bind(*id);
1999        }
2000        for id in &visited_ids {
2001            edge_query = edge_query.bind(*id);
2002        }
2003        if let Some(ts) = at_timestamp {
2004            edge_query = edge_query.bind(ts);
2005        }
2006        let edge_rows: Vec<EdgeRow> = edge_query.fetch_all(&self.pool).await?;
2007
2008        let entity_sql = format!(
2009            "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
2010             FROM graph_entities WHERE id IN ({ph})",
2011            ph = placeholder_list(1, n),
2012        );
2013        let mut entity_query = zeph_db::query_as::<_, EntityRow>(&entity_sql);
2014        for id in &visited_ids {
2015            entity_query = entity_query.bind(*id);
2016        }
2017        let entity_rows: Vec<EntityRow> = entity_query.fetch_all(&self.pool).await?;
2018
2019        let entities: Vec<Entity> = entity_rows
2020            .into_iter()
2021            .map(entity_from_row)
2022            .collect::<Result<Vec<_>, _>>()?;
2023        let edges: Vec<Edge> = edge_rows.into_iter().map(edge_from_row).collect();
2024
2025        Ok((entities, edges, depth_map))
2026    }
2027
2028    // ── Backfill helpers ──────────────────────────────────────────────────────
2029
2030    /// Find an entity by name only (no type filter).
2031    ///
2032    /// Uses a two-phase lookup to ensure exact name matches are always prioritised:
2033    /// 1. Exact case-insensitive match on `name` or `canonical_name`.
2034    /// 2. If no exact match found, falls back to FTS5 prefix search (see `find_entities_fuzzy`).
2035    ///
2036    /// This prevents FTS5 from returning a different entity whose *summary* mentions the
2037    /// searched name (e.g. searching "Alice" returning "Google" because Google's summary
2038    /// contains "Alice").
2039    ///
2040    /// # Errors
2041    ///
2042    /// Returns an error if the database query fails.
2043    pub async fn find_entity_by_name(&self, name: &str) -> Result<Vec<Entity>, MemoryError> {
2044        let find_by_name_sql = format!(
2045            "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id \
2046             FROM graph_entities \
2047             WHERE name = ? {cn} OR canonical_name = ? {cn} \
2048             LIMIT 5",
2049            cn = <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
2050        );
2051        let rows: Vec<EntityRow> = zeph_db::query_as(&find_by_name_sql)
2052            .bind(name)
2053            .bind(name)
2054            .fetch_all(&self.pool)
2055            .await?;
2056
2057        if !rows.is_empty() {
2058            return rows.into_iter().map(entity_from_row).collect();
2059        }
2060
2061        self.find_entities_fuzzy(name, 5).await
2062    }
2063
2064    /// Return up to `limit` messages that have not yet been processed by graph extraction.
2065    ///
2066    /// Reads the `graph_processed` column added by migration 021.
2067    ///
2068    /// # Errors
2069    ///
2070    /// Returns an error if the database query fails.
2071    pub async fn unprocessed_messages_for_backfill(
2072        &self,
2073        limit: usize,
2074    ) -> Result<Vec<(crate::types::MessageId, String)>, MemoryError> {
2075        let limit = i64::try_from(limit)?;
2076        let rows: Vec<(i64, String)> = zeph_db::query_as(sql!(
2077            "SELECT id, content FROM messages
2078             WHERE graph_processed = 0
2079             ORDER BY id ASC
2080             LIMIT ?"
2081        ))
2082        .bind(limit)
2083        .fetch_all(&self.pool)
2084        .await?;
2085        Ok(rows
2086            .into_iter()
2087            .map(|(id, content)| (crate::types::MessageId(id), content))
2088            .collect())
2089    }
2090
2091    /// Return the count of messages not yet processed by graph extraction.
2092    ///
2093    /// # Errors
2094    ///
2095    /// Returns an error if the database query fails.
2096    pub async fn unprocessed_message_count(&self) -> Result<i64, MemoryError> {
2097        let count: i64 = zeph_db::query_scalar(sql!(
2098            "SELECT COUNT(*) FROM messages WHERE graph_processed = 0"
2099        ))
2100        .fetch_one(&self.pool)
2101        .await?;
2102        Ok(count)
2103    }
2104
2105    /// Mark a batch of messages as graph-processed.
2106    ///
2107    /// # Errors
2108    ///
2109    /// Returns an error if the database query fails.
2110    pub async fn mark_messages_graph_processed(
2111        &self,
2112        ids: &[crate::types::MessageId],
2113    ) -> Result<(), MemoryError> {
2114        const MAX_BATCH: usize = 490;
2115        if ids.is_empty() {
2116            return Ok(());
2117        }
2118        for chunk in ids.chunks(MAX_BATCH) {
2119            let placeholders = placeholder_list(1, chunk.len());
2120            let sql =
2121                format!("UPDATE messages SET graph_processed = 1 WHERE id IN ({placeholders})");
2122            let mut query = zeph_db::query(&sql);
2123            for id in chunk {
2124                query = query.bind(id.0);
2125            }
2126            query.execute(&self.pool).await?;
2127        }
2128        Ok(())
2129    }
2130}
2131
2132// ── Dialect helpers ───────────────────────────────────────────────────────────
2133
2134#[cfg(feature = "sqlite")]
2135fn community_ids_sql(placeholders: &str) -> String {
2136    format!(
2137        "SELECT CAST(j.value AS INTEGER) AS entity_id, c.id AS community_id
2138         FROM graph_communities c, json_each(c.entity_ids) j
2139         WHERE CAST(j.value AS INTEGER) IN ({placeholders})"
2140    )
2141}
2142
2143#[cfg(feature = "postgres")]
2144fn community_ids_sql(placeholders: &str) -> String {
2145    format!(
2146        "SELECT (j.value)::bigint AS entity_id, c.id AS community_id
2147         FROM graph_communities c,
2148              jsonb_array_elements_text(c.entity_ids::jsonb) j(value)
2149         WHERE (j.value)::bigint IN ({placeholders})"
2150    )
2151}
2152
2153// ── Row types for zeph_db::query_as ─────────────────────────────────────────────
2154
2155#[derive(zeph_db::FromRow)]
2156struct EntityRow {
2157    id: i64,
2158    name: String,
2159    canonical_name: String,
2160    entity_type: String,
2161    summary: Option<String>,
2162    first_seen_at: String,
2163    last_seen_at: String,
2164    qdrant_point_id: Option<String>,
2165}
2166
2167fn entity_from_row(row: EntityRow) -> Result<Entity, MemoryError> {
2168    let entity_type = row
2169        .entity_type
2170        .parse::<EntityType>()
2171        .map_err(MemoryError::GraphStore)?;
2172    Ok(Entity {
2173        id: row.id,
2174        name: row.name,
2175        canonical_name: row.canonical_name,
2176        entity_type,
2177        summary: row.summary,
2178        first_seen_at: row.first_seen_at,
2179        last_seen_at: row.last_seen_at,
2180        qdrant_point_id: row.qdrant_point_id,
2181    })
2182}
2183
2184#[derive(zeph_db::FromRow)]
2185struct AliasRow {
2186    id: i64,
2187    entity_id: i64,
2188    alias_name: String,
2189    created_at: String,
2190}
2191
2192fn alias_from_row(row: AliasRow) -> EntityAlias {
2193    EntityAlias {
2194        id: row.id,
2195        entity_id: row.entity_id,
2196        alias_name: row.alias_name,
2197        created_at: row.created_at,
2198    }
2199}
2200
2201#[derive(zeph_db::FromRow)]
2202struct EdgeRow {
2203    id: i64,
2204    source_entity_id: i64,
2205    target_entity_id: i64,
2206    relation: String,
2207    fact: String,
2208    confidence: f64,
2209    valid_from: String,
2210    valid_to: Option<String>,
2211    created_at: String,
2212    expired_at: Option<String>,
2213    #[sqlx(rename = "episode_id")]
2214    source_message_id: Option<i64>,
2215    qdrant_point_id: Option<String>,
2216    edge_type: String,
2217    retrieval_count: i32,
2218    last_retrieved_at: Option<i64>,
2219    superseded_by: Option<i64>,
2220    canonical_relation: Option<String>,
2221    supersedes: Option<i64>,
2222    // Hebbian reinforcement weight (HL-F1, #3344). SQLite REAL maps to f64 via sqlx.
2223    weight: f64,
2224}
2225
2226fn edge_from_row(row: EdgeRow) -> Edge {
2227    let edge_type = row
2228        .edge_type
2229        .parse::<EdgeType>()
2230        .unwrap_or(EdgeType::Semantic);
2231    let canonical_relation = row
2232        .canonical_relation
2233        .unwrap_or_else(|| row.relation.clone());
2234    Edge {
2235        id: row.id,
2236        source_entity_id: row.source_entity_id,
2237        target_entity_id: row.target_entity_id,
2238        canonical_relation,
2239        relation: row.relation,
2240        fact: row.fact,
2241        #[allow(clippy::cast_possible_truncation)]
2242        confidence: row.confidence as f32,
2243        valid_from: row.valid_from,
2244        valid_to: row.valid_to,
2245        created_at: row.created_at,
2246        expired_at: row.expired_at,
2247        source_message_id: row.source_message_id.map(MessageId),
2248        qdrant_point_id: row.qdrant_point_id,
2249        edge_type,
2250        retrieval_count: row.retrieval_count,
2251        last_retrieved_at: row.last_retrieved_at,
2252        superseded_by: row.superseded_by,
2253        supersedes: row.supersedes,
2254        #[allow(clippy::cast_possible_truncation)]
2255        weight: row.weight as f32,
2256    }
2257}
2258
2259#[derive(zeph_db::FromRow)]
2260struct CommunityRow {
2261    id: i64,
2262    name: String,
2263    summary: String,
2264    entity_ids: String,
2265    fingerprint: Option<String>,
2266    created_at: String,
2267    updated_at: String,
2268}
2269
2270// ── GAAMA Episode methods ──────────────────────────────────────────────────────
2271
2272impl GraphStore {
2273    /// Ensure a GAAMA episode exists for this conversation, returning its ID.
2274    ///
2275    /// Idempotent: inserts on first call, returns existing ID on subsequent calls.
2276    ///
2277    /// # Errors
2278    ///
2279    /// Returns an error if the database query fails.
2280    pub async fn ensure_episode(&self, conversation_id: i64) -> Result<i64, MemoryError> {
2281        // Ensure the conversation row exists before inserting into graph_episodes,
2282        // which has a FK referencing conversations(id). On a fresh database the agent
2283        // may run graph extraction before the conversation row is committed.
2284        zeph_db::query(sql!("INSERT OR IGNORE INTO conversations (id) VALUES (?)"))
2285            .bind(conversation_id)
2286            .execute(&self.pool)
2287            .await?;
2288
2289        let id: i64 = zeph_db::query_scalar(sql!(
2290            "INSERT INTO graph_episodes (conversation_id)
2291             VALUES (?)
2292             ON CONFLICT(conversation_id) DO UPDATE SET conversation_id = excluded.conversation_id
2293             RETURNING id"
2294        ))
2295        .bind(conversation_id)
2296        .fetch_one(&self.pool)
2297        .await?;
2298        Ok(id)
2299    }
2300
2301    /// Record that an entity was observed in an episode.
2302    ///
2303    /// Idempotent: does nothing if the link already exists.
2304    ///
2305    /// # Errors
2306    ///
2307    /// Returns an error if the database query fails.
2308    pub async fn link_entity_to_episode(
2309        &self,
2310        episode_id: i64,
2311        entity_id: i64,
2312    ) -> Result<(), MemoryError> {
2313        zeph_db::query(sql!(
2314            "INSERT OR IGNORE INTO graph_episode_entities (episode_id, entity_id)
2315             VALUES (?, ?)"
2316        ))
2317        .bind(episode_id)
2318        .bind(entity_id)
2319        .execute(&self.pool)
2320        .await?;
2321        Ok(())
2322    }
2323
2324    /// Return all episodes in which an entity appears.
2325    ///
2326    /// # Errors
2327    ///
2328    /// Returns an error if the database query fails.
2329    pub async fn episodes_for_entity(
2330        &self,
2331        entity_id: i64,
2332    ) -> Result<Vec<super::types::Episode>, MemoryError> {
2333        #[derive(zeph_db::FromRow)]
2334        struct EpisodeRow {
2335            id: i64,
2336            conversation_id: i64,
2337            created_at: String,
2338            closed_at: Option<String>,
2339        }
2340        let rows: Vec<EpisodeRow> = zeph_db::query_as(sql!(
2341            "SELECT e.id, e.conversation_id, e.created_at, e.closed_at
2342             FROM graph_episodes e
2343             JOIN graph_episode_entities ee ON ee.episode_id = e.id
2344             WHERE ee.entity_id = ?"
2345        ))
2346        .bind(entity_id)
2347        .fetch_all(&self.pool)
2348        .await?;
2349        Ok(rows
2350            .into_iter()
2351            .map(|r| super::types::Episode {
2352                id: r.id,
2353                conversation_id: r.conversation_id,
2354                created_at: r.created_at,
2355                closed_at: r.closed_at,
2356            })
2357            .collect())
2358    }
2359
2360    /// Insert a new edge using APEX-MEM append-only supersede semantics (FR-001).
2361    ///
2362    /// If a byte-identical active edge exists for `(src, tgt, canonical_relation, edge_type)`,
2363    /// records a reassertion in `edge_reassertions` and returns the existing edge id (FR-015).
2364    ///
2365    /// If a different active edge exists for the same `(src, canonical_relation, edge_type)` key,
2366    /// invalidates it with a supersession pointer, inserts the new edge, and sets `supersedes` on
2367    /// the new row to link the chain. Checks that the resulting chain depth would not exceed
2368    /// [`SUPERSEDE_DEPTH_CAP`] and that no cycle would be introduced before committing.
2369    ///
2370    /// Optionally increments [`ApexMetrics::supersedes_total`] when `metrics` is provided and a
2371    /// supersession actually occurs.
2372    ///
2373    /// # Errors
2374    ///
2375    /// - [`MemoryError::SupersedeCycle`] — the new edge would create a supersede cycle.
2376    /// - [`MemoryError::SupersedeDepthExceeded`] — chain depth cap would be exceeded.
2377    /// - [`MemoryError::Sqlx`] / [`MemoryError::Db`] — database errors.
2378    #[allow(clippy::too_many_arguments)]
2379    // TODO(B3): refactor into a builder or config struct to reduce argument count
2380    // function with many required inputs; a *Params struct would be more verbose without simplifying the call site
2381    pub async fn insert_or_supersede_with_metrics(
2382        &self,
2383        source_entity_id: i64,
2384        target_entity_id: i64,
2385        relation: &str,
2386        canonical_relation: &str,
2387        fact: &str,
2388        confidence: f32,
2389        episode_id: Option<MessageId>,
2390        edge_type: EdgeType,
2391        set_supersedes: bool,
2392        metrics: Option<&ApexMetrics>,
2393    ) -> Result<i64, MemoryError> {
2394        if source_entity_id == target_entity_id {
2395            return Err(MemoryError::InvalidInput(format!(
2396                "self-loop edge rejected: source and target are the same entity (id={source_entity_id})"
2397            )));
2398        }
2399        let confidence = confidence.clamp(0.0, 1.0);
2400        let edge_type_str = edge_type.as_str();
2401        let episode_raw: Option<i64> = episode_id.map(|m| m.0);
2402
2403        let mut tx = zeph_db::begin(&self.pool).await?;
2404
2405        if let Some(existing_id) = find_identical_active_edge(
2406            &mut tx,
2407            source_entity_id,
2408            target_entity_id,
2409            canonical_relation,
2410            edge_type_str,
2411            fact,
2412        )
2413        .await?
2414        {
2415            record_reassertion(&mut tx, existing_id, episode_raw, confidence).await?;
2416            tx.commit().await?;
2417            return Ok(existing_id);
2418        }
2419
2420        let prior_head =
2421            find_prior_active_head(&mut tx, source_entity_id, canonical_relation, edge_type_str)
2422                .await?;
2423
2424        // Cycle guard: depth cap also bounds cycles. Run inside the transaction using sqlx
2425        // directly to avoid a second pool acquire (SQLite pool is typically size 1 in tests).
2426        if let Some(head_id) = prior_head {
2427            check_supersede_depth_in_tx(&mut tx, head_id).await?;
2428        }
2429
2430        let supersedes_val: Option<i64> = if set_supersedes { prior_head } else { None };
2431        let new_id = insert_new_edge(
2432            &mut tx,
2433            source_entity_id,
2434            target_entity_id,
2435            relation,
2436            canonical_relation,
2437            fact,
2438            confidence,
2439            episode_raw,
2440            edge_type_str,
2441            supersedes_val,
2442        )
2443        .await?;
2444
2445        if let Some(head_id) = prior_head {
2446            invalidate_prior_head(&mut tx, head_id, new_id).await?;
2447            if let Some(m) = metrics {
2448                m.supersedes_total
2449                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2450            }
2451        }
2452
2453        tx.commit().await?;
2454        Ok(new_id)
2455    }
2456
2457    /// Convenience wrapper: calls [`Self::insert_or_supersede_with_metrics`] with `metrics = None`.
2458    ///
2459    /// # Errors
2460    ///
2461    /// Propagates errors from [`Self::insert_or_supersede_with_metrics`].
2462    #[allow(clippy::too_many_arguments)] // function with many required inputs; a *Params struct would be more verbose without simplifying the call site
2463    pub async fn insert_or_supersede(
2464        &self,
2465        source_entity_id: i64,
2466        target_entity_id: i64,
2467        relation: &str,
2468        canonical_relation: &str,
2469        fact: &str,
2470        confidence: f32,
2471        episode_id: Option<MessageId>,
2472        edge_type: EdgeType,
2473        set_supersedes: bool,
2474    ) -> Result<i64, MemoryError> {
2475        self.insert_or_supersede_with_metrics(
2476            source_entity_id,
2477            target_entity_id,
2478            relation,
2479            canonical_relation,
2480            fact,
2481            confidence,
2482            episode_id,
2483            edge_type,
2484            set_supersedes,
2485            None,
2486        )
2487        .await
2488    }
2489
2490    /// Walk the `supersedes` chain from `head_id` using a single recursive CTE and return its depth.
2491    ///
2492    /// Returns `0` when the edge has no `supersedes` pointer (it is the root).
2493    /// The CTE is capped at `SUPERSEDE_DEPTH_CAP + 1` to prevent unbounded recursion.
2494    ///
2495    /// # Errors
2496    ///
2497    /// Returns [`MemoryError::SupersedeCycle`] when the CTE detects a cycle (depth exceeds cap
2498    /// but the chain has not terminated), or a database error on query failure.
2499    pub async fn check_supersede_depth(&self, head_id: i64) -> Result<usize, MemoryError> {
2500        Self::check_supersede_depth_with_pool(&self.pool, head_id).await
2501    }
2502
2503    async fn check_supersede_depth_with_pool(
2504        pool: &zeph_db::DbPool,
2505        head_id: i64,
2506    ) -> Result<usize, MemoryError> {
2507        let cap = i64::try_from(SUPERSEDE_DEPTH_CAP + 1).unwrap_or(i64::MAX);
2508        // CTE walks the supersedes chain starting at head_id (depth=0).
2509        // Each hop to a superseded ancestor increments depth. NULL supersedes terminates the walk.
2510        let depth: Option<i64> = zeph_db::query_scalar(sql!(
2511            "WITH RECURSIVE chain(id, depth) AS (
2512               SELECT id, 0 FROM graph_edges WHERE id = ?
2513               UNION ALL
2514               SELECT e.supersedes, c.depth + 1
2515               FROM graph_edges e JOIN chain c ON e.id = c.id
2516               WHERE e.supersedes IS NOT NULL AND c.depth < ?
2517             )
2518             SELECT MAX(depth) FROM chain"
2519        ))
2520        .bind(head_id)
2521        .bind(cap)
2522        .fetch_optional(pool)
2523        .await?
2524        .flatten();
2525
2526        #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
2527        let d = depth.unwrap_or(0) as usize;
2528        if d > SUPERSEDE_DEPTH_CAP {
2529            return Err(MemoryError::SupersedeCycle(head_id));
2530        }
2531        Ok(d)
2532    }
2533}
2534
2535// ── insert_or_supersede helpers ───────────────────────────────────────────────
2536// All helpers take `&mut zeph_db::DbTransaction` so they run inside the caller's transaction
2537// without acquiring a second connection (SQLite pool is typically size 1 in tests).
2538
2539type Tx<'a> = zeph_db::DbTransaction<'a>;
2540
2541/// Look up a byte-identical active edge (FR-015 reassertion path).
2542///
2543/// Returns the existing edge ID when all columns match exactly, or `None` otherwise.
2544async fn find_identical_active_edge(
2545    tx: &mut Tx<'_>,
2546    src: i64,
2547    tgt: i64,
2548    canon: &str,
2549    edge_type_str: &str,
2550    fact: &str,
2551) -> Result<Option<i64>, MemoryError> {
2552    Ok(zeph_db::query_scalar(sql!(
2553        "SELECT id FROM graph_edges
2554         WHERE source_entity_id = ?
2555           AND target_entity_id = ?
2556           AND canonical_relation = ?
2557           AND edge_type = ?
2558           AND fact = ?
2559           AND valid_to IS NULL
2560           AND expired_at IS NULL
2561         LIMIT 1"
2562    ))
2563    .bind(src)
2564    .bind(tgt)
2565    .bind(canon)
2566    .bind(edge_type_str)
2567    .bind(fact)
2568    .fetch_optional(&mut **tx)
2569    .await?)
2570}
2571
2572/// Record a reassertion event for an existing edge (FR-015).
2573async fn record_reassertion(
2574    tx: &mut Tx<'_>,
2575    head_id: i64,
2576    episode_raw: Option<i64>,
2577    confidence: f32,
2578) -> Result<(), MemoryError> {
2579    #[allow(clippy::cast_possible_wrap)]
2580    let asserted_at = std::time::SystemTime::now()
2581        .duration_since(std::time::UNIX_EPOCH)
2582        .unwrap_or_default()
2583        .as_secs() as i64;
2584    zeph_db::query(sql!(
2585        "INSERT INTO edge_reassertions (head_edge_id, asserted_at, episode_id, confidence)
2586         VALUES (?, ?, ?, ?)"
2587    ))
2588    .bind(head_id)
2589    .bind(asserted_at)
2590    .bind(episode_raw)
2591    .bind(f64::from(confidence))
2592    .execute(&mut **tx)
2593    .await?;
2594    Ok(())
2595}
2596
2597/// Find the current active head edge for `(src, canonical_relation, edge_type)`.
2598async fn find_prior_active_head(
2599    tx: &mut Tx<'_>,
2600    src: i64,
2601    canon: &str,
2602    edge_type_str: &str,
2603) -> Result<Option<i64>, MemoryError> {
2604    Ok(zeph_db::query_scalar(sql!(
2605        "SELECT id FROM graph_edges
2606         WHERE source_entity_id = ?
2607           AND canonical_relation = ?
2608           AND edge_type = ?
2609           AND valid_to IS NULL
2610           AND expired_at IS NULL
2611         ORDER BY created_at DESC
2612         LIMIT 1"
2613    ))
2614    .bind(src)
2615    .bind(canon)
2616    .bind(edge_type_str)
2617    .fetch_optional(&mut **tx)
2618    .await?)
2619}
2620
2621/// Verify the supersede chain depth for `head_id` does not exceed [`SUPERSEDE_DEPTH_CAP`].
2622///
2623/// Returns `Err(MemoryError::SupersedeDepthExceeded)` when the cap would be exceeded.
2624async fn check_supersede_depth_in_tx(tx: &mut Tx<'_>, head_id: i64) -> Result<(), MemoryError> {
2625    let cap = i64::try_from(SUPERSEDE_DEPTH_CAP + 1).unwrap_or(i64::MAX);
2626    let depth: Option<i64> = sqlx::query_scalar(
2627        "WITH RECURSIVE chain(id, depth) AS (
2628           SELECT supersedes, 1 FROM graph_edges WHERE id = ? AND supersedes IS NOT NULL
2629           UNION ALL
2630           SELECT e.supersedes, c.depth + 1
2631           FROM graph_edges e JOIN chain c ON e.id = c.id
2632           WHERE e.supersedes IS NOT NULL AND c.depth < ?
2633         )
2634         SELECT MAX(depth) FROM chain",
2635    )
2636    .bind(head_id)
2637    .bind(cap)
2638    .fetch_optional(&mut **tx)
2639    .await?
2640    .flatten();
2641    let d = usize::try_from(depth.unwrap_or(0)).unwrap_or(usize::MAX);
2642    if d > SUPERSEDE_DEPTH_CAP {
2643        return Err(MemoryError::SupersedeDepthExceeded(head_id));
2644    }
2645    Ok(())
2646}
2647
2648/// Insert a new edge row and return its generated ID.
2649#[allow(clippy::too_many_arguments)]
2650async fn insert_new_edge(
2651    tx: &mut Tx<'_>,
2652    src: i64,
2653    tgt: i64,
2654    relation: &str,
2655    canonical_relation: &str,
2656    fact: &str,
2657    confidence: f32,
2658    episode_raw: Option<i64>,
2659    edge_type_str: &str,
2660    supersedes_val: Option<i64>,
2661) -> Result<i64, MemoryError> {
2662    Ok(zeph_db::query_scalar(sql!(
2663        "INSERT INTO graph_edges
2664         (source_entity_id, target_entity_id, relation, canonical_relation, fact,
2665          confidence, episode_id, edge_type, supersedes)
2666         VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
2667         RETURNING id"
2668    ))
2669    .bind(src)
2670    .bind(tgt)
2671    .bind(relation)
2672    .bind(canonical_relation)
2673    .bind(fact)
2674    .bind(f64::from(confidence))
2675    .bind(episode_raw)
2676    .bind(edge_type_str)
2677    .bind(supersedes_val)
2678    .fetch_one(&mut **tx)
2679    .await?)
2680}
2681
2682/// Mark the prior head edge as superseded by `new_id`.
2683async fn invalidate_prior_head(
2684    tx: &mut Tx<'_>,
2685    head_id: i64,
2686    new_id: i64,
2687) -> Result<(), MemoryError> {
2688    zeph_db::query(sql!(
2689        "UPDATE graph_edges
2690         SET valid_to = CURRENT_TIMESTAMP,
2691             expired_at = CURRENT_TIMESTAMP,
2692             superseded_by = ?
2693         WHERE id = ?"
2694    ))
2695    .bind(new_id)
2696    .bind(head_id)
2697    .execute(&mut **tx)
2698    .await?;
2699    Ok(())
2700}
2701
2702// ── FTS helpers ───────────────────────────────────────────────────────────────
2703
2704/// Row type for the UNION ALL FTS5 query in `find_entities_ranked`.
2705type EntityFtsRow = (
2706    i64,
2707    String,
2708    String,
2709    String,
2710    Option<String>,
2711    String,
2712    String,
2713    Option<String>,
2714    f64,
2715);
2716
2717/// Sanitize and tokenize `query` into an FTS5 query string.
2718///
2719/// Returns `None` when the input is empty or contains only FTS5 operator tokens,
2720/// indicating no search should be attempted.
2721fn build_fts_query(query: &str) -> Option<String> {
2722    const FTS5_OPERATORS: &[&str] = &["AND", "OR", "NOT", "NEAR"];
2723    let sanitized = sanitize_fts_query(query);
2724    if sanitized.is_empty() {
2725        return None;
2726    }
2727    let fts_query: String = sanitized
2728        .split_whitespace()
2729        .filter(|t| !FTS5_OPERATORS.contains(t))
2730        .map(|t| format!("{t}*"))
2731        .collect::<Vec<_>>()
2732        .join(" ");
2733    if fts_query.is_empty() {
2734        None
2735    } else {
2736        Some(fts_query)
2737    }
2738}
2739
2740/// Build the UNION ALL FTS5 SQL for entity ranked search.
2741///
2742/// The SQL selects entities by direct FTS5 match (negated BM25) and by alias LIKE match
2743/// (fixed score 0.5), then orders by score descending with a LIMIT applied outside.
2744fn build_ranked_fts_sql() -> String {
2745    format!(
2746        "SELECT * FROM ( \
2747             SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
2748                    e.first_seen_at, e.last_seen_at, e.qdrant_point_id, \
2749                    -bm25(graph_entities_fts, 10.0, 1.0) AS fts_rank \
2750             FROM graph_entities_fts fts \
2751             JOIN graph_entities e ON e.id = fts.rowid \
2752             WHERE graph_entities_fts MATCH ? \
2753             UNION ALL \
2754             SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
2755                    e.first_seen_at, e.last_seen_at, e.qdrant_point_id, \
2756                    0.5 AS fts_rank \
2757             FROM graph_entity_aliases a \
2758             JOIN graph_entities e ON e.id = a.entity_id \
2759             WHERE a.alias_name LIKE ? ESCAPE '\\' {} \
2760         ) \
2761         ORDER BY fts_rank DESC \
2762         LIMIT ?",
2763        <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
2764    )
2765}
2766
2767/// Normalize FTS scores to `[0, 1]` and deduplicate by entity ID.
2768///
2769/// Deduplication keeps the first (highest-ranked) occurrence of each entity ID.
2770fn normalize_and_dedup(rows: Vec<EntityFtsRow>) -> Vec<(Entity, f32)> {
2771    // Guard against div-by-zero when all scores are 0.
2772    let max_score: f64 = rows.iter().map(|r| r.8).fold(0.0_f64, f64::max);
2773    let max_score = if max_score <= 0.0 { 1.0 } else { max_score };
2774
2775    let mut seen_ids: std::collections::HashSet<i64> = std::collections::HashSet::new();
2776    let mut result: Vec<(Entity, f32)> = Vec::with_capacity(rows.len());
2777    for (
2778        id,
2779        name,
2780        canonical_name,
2781        entity_type_str,
2782        summary,
2783        first_seen_at,
2784        last_seen_at,
2785        qdrant_point_id,
2786        raw_score,
2787    ) in rows
2788    {
2789        if !seen_ids.insert(id) {
2790            continue;
2791        }
2792        let entity_type = entity_type_str
2793            .parse()
2794            .unwrap_or(super::types::EntityType::Concept);
2795        let entity = Entity {
2796            id,
2797            name,
2798            canonical_name,
2799            entity_type,
2800            summary,
2801            first_seen_at,
2802            last_seen_at,
2803            qdrant_point_id,
2804        };
2805        #[allow(clippy::cast_possible_truncation)]
2806        let normalized = (raw_score / max_score).clamp(0.0, 1.0) as f32;
2807        result.push((entity, normalized));
2808    }
2809    result
2810}
2811
2812// ── Tests ─────────────────────────────────────────────────────────────────────
2813
2814#[cfg(test)]
2815mod tests;