Skip to main content

zeph_memory/graph/store/
mod.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! SQLite-backed knowledge graph store.
5//!
6//! [`GraphStore`] manages the `graph_entities`, `graph_edges`, `graph_entity_aliases`,
7//! `graph_communities`, and `graph_episodes` tables. It is the persistence layer for
8//! the MAGMA / GAAMA graph subsystem.
9
10use std::collections::HashMap;
11#[allow(unused_imports)]
12use zeph_db::sql;
13
14use futures::Stream;
15use zeph_db::fts::sanitize_fts_query;
16use zeph_db::{ActiveDialect, DbPool, numbered_placeholder, placeholder_list};
17
18use crate::error::MemoryError;
19use crate::graph::conflict::{ApexMetrics, SUPERSEDE_DEPTH_CAP};
20use crate::types::{EntityId, MessageId};
21
22use super::types::{Community, Edge, EdgeType, Entity, EntityAlias, EntityType};
23
24/// SQLite-backed persistence layer for the knowledge graph.
25///
26/// All graph mutations go through this type: entity upserts, edge creation,
27/// alias recording, community assignment, and episode management.
28///
29/// Obtain an instance via [`GraphStore::new`] after running the `zeph-db` migrations.
30pub struct GraphStore {
31    pool: DbPool,
32    /// Benna-Fusi fast-variable learning rate. Range: `(0.0, 1.0]`. Default: `0.5`.
33    benna_fast_rate: f32,
34    /// Benna-Fusi slow-variable learning rate. Range: `(0.0, 1.0]`. Default: `0.05`.
35    benna_slow_rate: f32,
36}
37
38impl GraphStore {
39    /// Create a new `GraphStore` wrapping `pool`.
40    ///
41    /// The pool must come from a database with the `zeph-db` migrations applied.
42    #[must_use]
43    pub fn new(pool: DbPool) -> Self {
44        Self {
45            pool,
46            benna_fast_rate: 0.5,
47            benna_slow_rate: 0.05,
48        }
49    }
50
51    /// Set Benna-Fusi learning rates for the multi-timescale synaptic update (#3709).
52    ///
53    /// Call this after [`Self::new`] when the rates come from `[memory.graph.spreading_activation]`.
54    #[must_use]
55    pub fn with_benna_rates(mut self, fast_rate: f32, slow_rate: f32) -> Self {
56        self.benna_fast_rate = fast_rate;
57        self.benna_slow_rate = slow_rate;
58        self
59    }
60
61    /// Access the underlying database pool.
62    #[must_use]
63    pub fn pool(&self) -> &DbPool {
64        &self.pool
65    }
66
67    // ── Entities ─────────────────────────────────────────────────────────────
68
69    /// Insert or update an entity by `(canonical_name, entity_type)`.
70    ///
71    /// - `surface_name`: the original display form (e.g. `"Rust"`) — stored in the `name` column
72    ///   so user-facing output preserves casing. Updated on every upsert to the latest seen form.
73    /// - `canonical_name`: the stable normalized key (e.g. `"rust"`) — used for deduplication.
74    /// - `summary`: pass `None` to preserve the existing summary; pass `Some("")` to blank it.
75    ///
76    /// # Errors
77    ///
78    /// Returns an error if the database query fails.
79    pub async fn upsert_entity(
80        &self,
81        surface_name: &str,
82        canonical_name: &str,
83        entity_type: EntityType,
84        summary: Option<&str>,
85    ) -> Result<EntityId, MemoryError> {
86        let type_str = entity_type.as_str();
87        let id: i64 = zeph_db::query_scalar(sql!(
88            "INSERT INTO graph_entities (name, canonical_name, entity_type, summary)
89             VALUES (?, ?, ?, ?)
90             ON CONFLICT(canonical_name, entity_type) DO UPDATE SET
91               name = excluded.name,
92               summary = COALESCE(excluded.summary, summary),
93               last_seen_at = CURRENT_TIMESTAMP
94             RETURNING id"
95        ))
96        .bind(surface_name)
97        .bind(canonical_name)
98        .bind(type_str)
99        .bind(summary)
100        .fetch_one(&self.pool)
101        .await?;
102        Ok(EntityId(id))
103    }
104
105    /// Find an entity by exact canonical name and type.
106    ///
107    /// # Errors
108    ///
109    /// Returns an error if the database query fails.
110    pub async fn find_entity(
111        &self,
112        canonical_name: &str,
113        entity_type: EntityType,
114    ) -> Result<Option<Entity>, MemoryError> {
115        let type_str = entity_type.as_str();
116        let row: Option<EntityRow> = zeph_db::query_as(
117            sql!("SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
118             FROM graph_entities
119             WHERE canonical_name = ? AND entity_type = ?"),
120        )
121        .bind(canonical_name)
122        .bind(type_str)
123        .fetch_optional(&self.pool)
124        .await?;
125        row.map(entity_from_row).transpose()
126    }
127
128    /// Find an entity by its numeric ID.
129    ///
130    /// # Errors
131    ///
132    /// Returns an error if the database query fails.
133    pub async fn find_entity_by_id(&self, entity_id: i64) -> Result<Option<Entity>, MemoryError> {
134        let row: Option<EntityRow> = zeph_db::query_as(
135            sql!("SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
136             FROM graph_entities
137             WHERE id = ?"),
138        )
139        .bind(entity_id)
140        .fetch_optional(&self.pool)
141        .await?;
142        row.map(entity_from_row).transpose()
143    }
144
145    /// Update the `qdrant_point_id` for an entity.
146    ///
147    /// # Errors
148    ///
149    /// Returns an error if the database query fails.
150    pub async fn set_entity_qdrant_point_id(
151        &self,
152        entity_id: i64,
153        point_id: &str,
154    ) -> Result<(), MemoryError> {
155        zeph_db::query(sql!(
156            "UPDATE graph_entities SET qdrant_point_id = ? WHERE id = ?"
157        ))
158        .bind(point_id)
159        .bind(entity_id)
160        .execute(&self.pool)
161        .await?;
162        Ok(())
163    }
164
165    /// Find entities matching `query` in name, summary, or aliases, up to `limit` results, ranked by relevance.
166    ///
167    /// Uses FTS5 MATCH with prefix wildcards (`token*`) and bm25 ranking. Name matches are
168    /// weighted 10x higher than summary matches. Also searches `graph_entity_aliases` for
169    /// alias matches via a UNION query.
170    ///
171    /// # Behavioral note
172    ///
173    /// This replaces the previous `LIKE '%query%'` implementation. FTS5 prefix matching differs
174    /// from substring matching: searching "SQL" will match "`SQLite`" (prefix) but NOT "`GraphQL`"
175    /// (substring). Entity names are indexed as single tokens by the unicode61 tokenizer, so
176    /// mid-word substrings are not matched. This is a known trade-off for index performance.
177    ///
178    /// Single-character queries (e.g., "a") are allowed and produce a broad prefix match ("a*").
179    /// The `limit` parameter caps the result set. No minimum query length is enforced; if this
180    /// causes noise in practice, add a minimum length guard at the call site.
181    ///
182    /// # Errors
183    ///
184    /// Returns an error if the database query fails.
185    pub async fn find_entities_fuzzy(
186        &self,
187        query: &str,
188        limit: usize,
189    ) -> Result<Vec<Entity>, MemoryError> {
190        // FTS5 boolean operator keywords (case-sensitive uppercase). Filtering these
191        // prevents syntax errors when user input contains them as literal search terms
192        // (e.g., "graph OR unrelated" must not produce "graph* OR* unrelated*").
193        const FTS5_OPERATORS: &[&str] = &["AND", "OR", "NOT", "NEAR"];
194        let query = &query[..query.floor_char_boundary(512)];
195        // Sanitize input: split on non-alphanumeric characters, filter empty tokens,
196        // append '*' to each token for FTS5 prefix matching ("graph" -> "graph*").
197        let sanitized = sanitize_fts_query(query);
198        if sanitized.is_empty() {
199            return Ok(vec![]);
200        }
201        let fts_query: String = sanitized
202            .split_whitespace()
203            .filter(|t| !FTS5_OPERATORS.contains(t))
204            .map(|t| format!("{t}*"))
205            .collect::<Vec<_>>()
206            .join(" ");
207        if fts_query.is_empty() {
208            return Ok(vec![]);
209        }
210
211        let limit = i64::try_from(limit)?;
212        // bm25(graph_entities_fts, 10.0, 1.0): name column weighted 10x over summary.
213        // bm25() returns negative values; ORDER BY ASC puts best matches first.
214        let search_sql = format!(
215            "SELECT DISTINCT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
216                    e.first_seen_at, e.last_seen_at, e.qdrant_point_id \
217             FROM graph_entities_fts fts \
218             JOIN graph_entities e ON e.id = fts.rowid \
219             WHERE graph_entities_fts MATCH ? \
220             UNION \
221             SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
222                    e.first_seen_at, e.last_seen_at, e.qdrant_point_id \
223             FROM graph_entity_aliases a \
224             JOIN graph_entities e ON e.id = a.entity_id \
225             WHERE a.alias_name LIKE ? ESCAPE '\\' {} \
226             LIMIT ?",
227            <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
228        );
229        let rows: Vec<EntityRow> = zeph_db::query_as(&search_sql)
230            .bind(&fts_query)
231            .bind(format!(
232                "%{}%",
233                query
234                    .trim()
235                    .replace('\\', "\\\\")
236                    .replace('%', "\\%")
237                    .replace('_', "\\_")
238            ))
239            .bind(limit)
240            .fetch_all(&self.pool)
241            .await?;
242        rows.into_iter()
243            .map(entity_from_row)
244            .collect::<Result<Vec<_>, _>>()
245    }
246
247    /// Flush the `SQLite` WAL to the main database file.
248    ///
249    /// Runs `PRAGMA wal_checkpoint(PASSIVE)`. Safe to call at any time; does not block active
250    /// readers or writers. Call after bulk entity inserts to ensure FTS5 shadow table writes are
251    /// visible to connections opened in future sessions.
252    ///
253    /// # Errors
254    ///
255    /// Returns an error if the PRAGMA execution fails.
256    #[cfg(all(feature = "sqlite", not(feature = "postgres")))]
257    pub async fn checkpoint_wal(&self) -> Result<(), MemoryError> {
258        zeph_db::query("PRAGMA wal_checkpoint(PASSIVE)")
259            .execute(&self.pool)
260            .await?;
261        Ok(())
262    }
263
264    /// No-op on `PostgreSQL` (WAL management is handled by the server).
265    ///
266    /// # Errors
267    ///
268    /// Never returns an error.
269    #[cfg(feature = "postgres")]
270    #[allow(clippy::unused_async)]
271    pub async fn checkpoint_wal(&self) -> Result<(), MemoryError> {
272        Ok(())
273    }
274
275    /// Stream all entities from the database incrementally (true cursor, no full-table load).
276    pub fn all_entities_stream(&self) -> impl Stream<Item = Result<Entity, MemoryError>> + '_ {
277        use futures::StreamExt as _;
278        zeph_db::query_as::<_, EntityRow>(
279            sql!("SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
280             FROM graph_entities ORDER BY id ASC"),
281        )
282        .fetch(&self.pool)
283        .map(|r: Result<EntityRow, zeph_db::SqlxError>| {
284            r.map_err(MemoryError::from).and_then(entity_from_row)
285        })
286    }
287
288    // ── Alias methods ─────────────────────────────────────────────────────────
289
290    /// Insert an alias for an entity (idempotent: duplicate alias is silently ignored via UNIQUE constraint).
291    ///
292    /// # Errors
293    ///
294    /// Returns an error if the database query fails.
295    pub async fn add_alias(&self, entity_id: i64, alias_name: &str) -> Result<(), MemoryError> {
296        let insert_alias_sql = format!(
297            "{} INTO graph_entity_aliases (entity_id, alias_name) VALUES (?, ?){}",
298            <ActiveDialect as zeph_db::dialect::Dialect>::INSERT_IGNORE,
299            <ActiveDialect as zeph_db::dialect::Dialect>::CONFLICT_NOTHING,
300        );
301        zeph_db::query(&insert_alias_sql)
302            .bind(entity_id)
303            .bind(alias_name)
304            .execute(&self.pool)
305            .await?;
306        Ok(())
307    }
308
309    /// Find an entity by alias name and entity type (case-insensitive).
310    ///
311    /// Filters by `entity_type` to avoid cross-type alias collisions (S2 fix).
312    ///
313    /// # Errors
314    ///
315    /// Returns an error if the database query fails.
316    pub async fn find_entity_by_alias(
317        &self,
318        alias_name: &str,
319        entity_type: EntityType,
320    ) -> Result<Option<Entity>, MemoryError> {
321        let type_str = entity_type.as_str();
322        let alias_typed_sql = format!(
323            "SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
324                    e.first_seen_at, e.last_seen_at, e.qdrant_point_id \
325             FROM graph_entity_aliases a \
326             JOIN graph_entities e ON e.id = a.entity_id \
327             WHERE a.alias_name = ? {} \
328               AND e.entity_type = ? \
329             ORDER BY e.id ASC \
330             LIMIT 1",
331            <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
332        );
333        let row: Option<EntityRow> = zeph_db::query_as(&alias_typed_sql)
334            .bind(alias_name)
335            .bind(type_str)
336            .fetch_optional(&self.pool)
337            .await?;
338        row.map(entity_from_row).transpose()
339    }
340
341    /// Get all aliases for an entity.
342    ///
343    /// # Errors
344    ///
345    /// Returns an error if the database query fails.
346    pub async fn aliases_for_entity(
347        &self,
348        entity_id: i64,
349    ) -> Result<Vec<EntityAlias>, MemoryError> {
350        let rows: Vec<AliasRow> = zeph_db::query_as(sql!(
351            "SELECT id, entity_id, alias_name, created_at
352             FROM graph_entity_aliases
353             WHERE entity_id = ?
354             ORDER BY id ASC"
355        ))
356        .bind(entity_id)
357        .fetch_all(&self.pool)
358        .await?;
359        Ok(rows.into_iter().map(alias_from_row).collect())
360    }
361
362    /// Collect all entities into a Vec.
363    ///
364    /// # Errors
365    ///
366    /// Returns an error if the database query fails or `entity_type` parsing fails.
367    pub async fn all_entities(&self) -> Result<Vec<Entity>, MemoryError> {
368        use futures::TryStreamExt as _;
369        self.all_entities_stream().try_collect().await
370    }
371
372    /// Count the total number of entities.
373    ///
374    /// # Errors
375    ///
376    /// Returns an error if the database query fails.
377    pub async fn entity_count(&self) -> Result<i64, MemoryError> {
378        let count: i64 = zeph_db::query_scalar(sql!("SELECT COUNT(*) FROM graph_entities"))
379            .fetch_one(&self.pool)
380            .await?;
381        Ok(count)
382    }
383
384    // ── Edges ─────────────────────────────────────────────────────────────────
385
386    /// Insert a new edge between two entities, or update the existing active edge.
387    ///
388    /// An active edge is identified by `(source_entity_id, target_entity_id, relation, edge_type)`
389    /// with `valid_to IS NULL`. If such an edge already exists, its `confidence` is updated to the
390    /// maximum of the stored and incoming values, and the existing id is returned. This prevents
391    /// duplicate edges from repeated extraction of the same context messages.
392    ///
393    /// The dedup key includes `edge_type` (critic mitigation): the same `(source, target, relation)`
394    /// triple can legitimately exist with different edge types (e.g., `depends_on` can be both
395    /// Semantic and Causal). Without `edge_type` in the key, the second insertion would silently
396    /// update the first and lose the type classification.
397    ///
398    /// # Errors
399    ///
400    /// Returns an error if the database query fails.
401    pub async fn insert_edge(
402        &self,
403        source_entity_id: i64,
404        target_entity_id: i64,
405        relation: &str,
406        fact: &str,
407        confidence: f32,
408        episode_id: Option<MessageId>,
409    ) -> Result<i64, MemoryError> {
410        self.insert_edge_typed(
411            source_entity_id,
412            target_entity_id,
413            relation,
414            fact,
415            confidence,
416            episode_id,
417            EdgeType::Semantic,
418        )
419        .await
420    }
421
422    /// Insert a typed edge between two entities, or update the existing active edge of the same type.
423    ///
424    /// Identical semantics to [`Self::insert_edge`] but with an explicit `edge_type` parameter.
425    /// The dedup key is `(source_entity_id, target_entity_id, relation, edge_type, valid_to IS NULL)`.
426    ///
427    /// # Errors
428    ///
429    /// Returns an error if the database query fails.
430    #[allow(clippy::too_many_arguments)] // function with many required inputs; a *Params struct would be more verbose without simplifying the call site
431    pub async fn insert_edge_typed(
432        &self,
433        source_entity_id: i64,
434        target_entity_id: i64,
435        relation: &str,
436        fact: &str,
437        confidence: f32,
438        episode_id: Option<MessageId>,
439        edge_type: EdgeType,
440    ) -> Result<i64, MemoryError> {
441        if source_entity_id == target_entity_id {
442            return Err(MemoryError::InvalidInput(format!(
443                "self-loop edge rejected: source and target are the same entity (id={source_entity_id})"
444            )));
445        }
446        let confidence = confidence.clamp(0.0, 1.0);
447        let edge_type_str = edge_type.as_str();
448
449        // Wrap SELECT + INSERT/UPDATE in a single transaction to eliminate the race window
450        // between existence check and write. The unique partial index uq_graph_edges_active
451        // covers (source, target, relation, edge_type) WHERE valid_to IS NULL; SQLite does not
452        // support ON CONFLICT DO UPDATE against partial indexes, so we keep two statements.
453        let mut tx = zeph_db::begin(&self.pool).await?;
454
455        let existing: Option<(i64, f64, f64, f64)> = zeph_db::query_as(sql!(
456            "SELECT id, confidence, confidence_fast, confidence_slow FROM graph_edges
457             WHERE source_entity_id = ?
458               AND target_entity_id = ?
459               AND relation = ?
460               AND edge_type = ?
461               AND valid_to IS NULL
462             LIMIT 1"
463        ))
464        .bind(source_entity_id)
465        .bind(target_entity_id)
466        .bind(relation)
467        .bind(edge_type_str)
468        .fetch_optional(&mut *tx)
469        .await?;
470
471        if let Some((existing_id, stored_conf, stored_fast, stored_slow)) = existing {
472            let updated_conf = f64::from(confidence).max(stored_conf);
473            // Benna-Fusi multi-timescale update (#3709):
474            // fast tracks incoming confidence at high plasticity; slow integrates fast at high retention.
475            // Clamp DB-read values to [0, 1] as defense-in-depth against corrupted rows.
476            #[allow(clippy::cast_possible_truncation)]
477            let stored_fast_f32 = (stored_fast as f32).clamp(0.0, 1.0);
478            #[allow(clippy::cast_possible_truncation)]
479            let stored_slow_f32 = (stored_slow as f32).clamp(0.0, 1.0);
480            let new_fast = stored_fast_f32 + self.benna_fast_rate * (confidence - stored_fast_f32);
481            let new_slow = stored_slow_f32 + self.benna_slow_rate * (new_fast - stored_slow_f32);
482            zeph_db::query(sql!(
483                "UPDATE graph_edges SET confidence = ?, confidence_fast = ?, confidence_slow = ? WHERE id = ?"
484            ))
485            .bind(updated_conf)
486            .bind(f64::from(new_fast))
487            .bind(f64::from(new_slow))
488            .bind(existing_id)
489            .execute(&mut *tx)
490            .await?;
491            tx.commit().await?;
492            return Ok(existing_id);
493        }
494
495        let episode_raw: Option<i64> = episode_id.map(|m| m.0);
496        let id: i64 = zeph_db::query_scalar(sql!(
497            "INSERT INTO graph_edges
498             (source_entity_id, target_entity_id, relation, fact, confidence,
499              confidence_fast, confidence_slow, episode_id, edge_type)
500             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
501             RETURNING id"
502        ))
503        .bind(source_entity_id)
504        .bind(target_entity_id)
505        .bind(relation)
506        .bind(fact)
507        .bind(f64::from(confidence))
508        .bind(f64::from(confidence))
509        .bind(f64::from(confidence))
510        .bind(episode_raw)
511        .bind(edge_type_str)
512        .fetch_one(&mut *tx)
513        .await?;
514        tx.commit().await?;
515        Ok(id)
516    }
517
518    /// Mark an edge as invalid (set `valid_to` and `expired_at` to now).
519    ///
520    /// # Errors
521    ///
522    /// Returns an error if the database update fails.
523    pub async fn invalidate_edge(&self, edge_id: i64) -> Result<(), MemoryError> {
524        zeph_db::query(sql!(
525            "UPDATE graph_edges SET valid_to = CURRENT_TIMESTAMP, expired_at = CURRENT_TIMESTAMP
526             WHERE id = ?"
527        ))
528        .bind(edge_id)
529        .execute(&self.pool)
530        .await?;
531        Ok(())
532    }
533
534    /// Invalidate an edge and record the supersession pointer for Kumiho belief revision audit trail.
535    ///
536    /// Sets `valid_to`, `expired_at`, and `superseded_by` on the old edge to link it to its replacement.
537    ///
538    /// # Errors
539    ///
540    /// Returns an error if the database update fails.
541    pub async fn invalidate_edge_with_supersession(
542        &self,
543        old_edge_id: i64,
544        new_edge_id: i64,
545    ) -> Result<(), MemoryError> {
546        zeph_db::query(sql!(
547            "UPDATE graph_edges
548             SET valid_to = CURRENT_TIMESTAMP,
549                 expired_at = CURRENT_TIMESTAMP,
550                 superseded_by = ?
551             WHERE id = ?"
552        ))
553        .bind(new_edge_id)
554        .bind(old_edge_id)
555        .execute(&self.pool)
556        .await?;
557        Ok(())
558    }
559
560    /// Get all active edges for a batch of entity IDs, with optional MAGMA edge type filtering.
561    ///
562    /// Fetches all currently-active edges (`valid_to IS NULL`) where either endpoint
563    /// is in `entity_ids`. Traversal is always current-time only (no `at_timestamp` support
564    /// in v1 — see `bfs_at_timestamp` for historical traversal).
565    ///
566    /// # `SQLite` bind limit safety
567    ///
568    /// `SQLite` limits the number of bind parameters to `SQLITE_MAX_VARIABLE_NUMBER` (999 by
569    /// default). Each entity ID requires two bind slots (source OR target), so batches are
570    /// chunked at `MAX_BATCH_ENTITIES = 490` to stay safely under the limit regardless of
571    /// compile-time `SQLite` configuration.
572    ///
573    /// # Errors
574    ///
575    /// Returns an error if the database query fails.
576    pub async fn edges_for_entities(
577        &self,
578        entity_ids: &[i64],
579        edge_types: &[super::types::EdgeType],
580    ) -> Result<Vec<Edge>, MemoryError> {
581        // Safe margin under SQLite SQLITE_MAX_VARIABLE_NUMBER (999):
582        // each entity ID uses 2 bind slots (source_entity_id OR target_entity_id).
583        // 490 * 2 = 980, leaving headroom for future query additions.
584        const MAX_BATCH_ENTITIES: usize = 490;
585
586        let mut all_edges: Vec<Edge> = Vec::new();
587
588        for chunk in entity_ids.chunks(MAX_BATCH_ENTITIES) {
589            let edges = self.query_batch_edges(chunk, edge_types).await?;
590            all_edges.extend(edges);
591        }
592
593        Ok(all_edges)
594    }
595
596    /// Query active edges for a single chunk of entity IDs (internal helper).
597    ///
598    /// Caller is responsible for ensuring `entity_ids.len() <= MAX_BATCH_ENTITIES`.
599    ///
600    /// # Errors
601    ///
602    /// Returns an error if any database query fails.
603    async fn query_batch_edges(
604        &self,
605        entity_ids: &[i64],
606        edge_types: &[super::types::EdgeType],
607    ) -> Result<Vec<Edge>, MemoryError> {
608        if entity_ids.is_empty() {
609            return Ok(Vec::new());
610        }
611
612        // Build a parameterized IN clause with backend-appropriate placeholders.
613        // We cannot use the sql! macro here because the placeholder count is dynamic.
614        let n_ids = entity_ids.len();
615        let n_types = edge_types.len();
616
617        let sql = if n_types == 0 {
618            // placeholders used twice (source IN and target IN)
619            let placeholders = placeholder_list(1, n_ids);
620            let placeholders2 = placeholder_list(n_ids + 1, n_ids);
621            format!(
622                "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
623                        valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
624                        edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight, confidence_fast, confidence_slow, turn_index
625                 FROM graph_edges
626                 WHERE valid_to IS NULL
627                   AND (source_entity_id IN ({placeholders}) OR target_entity_id IN ({placeholders2}))"
628            )
629        } else {
630            let placeholders = placeholder_list(1, n_ids);
631            let placeholders2 = placeholder_list(n_ids + 1, n_ids);
632            let type_placeholders = placeholder_list(n_ids * 2 + 1, n_types);
633            format!(
634                "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
635                        valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
636                        edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight, confidence_fast, confidence_slow, turn_index
637                 FROM graph_edges
638                 WHERE valid_to IS NULL
639                   AND (source_entity_id IN ({placeholders}) OR target_entity_id IN ({placeholders2}))
640                   AND edge_type IN ({type_placeholders})"
641            )
642        };
643
644        // Bind entity IDs twice (source IN and target IN clauses) then edge types.
645        let mut query = zeph_db::query_as::<_, EdgeRow>(&sql);
646        for id in entity_ids {
647            query = query.bind(*id);
648        }
649        for id in entity_ids {
650            query = query.bind(*id);
651        }
652        for et in edge_types {
653            query = query.bind(et.as_str());
654        }
655
656        // Wrap pool.acquire() + query execution in a short timeout to prevent the outer
657        // tokio::time::timeout (in SemanticMemory recall) from cancelling a mid-acquire
658        // future, which causes sqlx 0.8 semaphore count drift and permanent pool starvation.
659        let rows: Vec<EdgeRow> = tokio::time::timeout(
660            std::time::Duration::from_millis(500),
661            query.fetch_all(&self.pool),
662        )
663        .await
664        .map_err(|_| MemoryError::Timeout("graph pool acquire timed out after 500ms".into()))??;
665        Ok(rows.into_iter().map(edge_from_row).collect())
666    }
667
668    /// Get all active edges where entity is source or target.
669    ///
670    /// # Errors
671    ///
672    /// Returns an error if the database query fails.
673    pub async fn edges_for_entity(&self, entity_id: i64) -> Result<Vec<Edge>, MemoryError> {
674        let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
675            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
676                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
677                    edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight, confidence_fast, confidence_slow, turn_index
678             FROM graph_edges
679             WHERE valid_to IS NULL
680               AND (source_entity_id = ? OR target_entity_id = ?)"
681        ))
682        .bind(entity_id)
683        .bind(entity_id)
684        .fetch_all(&self.pool)
685        .await?;
686        Ok(rows.into_iter().map(edge_from_row).collect())
687    }
688
689    /// Get all edges (active and expired) where entity is source or target, ordered by
690    /// `valid_from DESC`. Used by the `/graph history <name>` slash command.
691    ///
692    /// # Errors
693    ///
694    /// Returns an error if the database query fails or if `limit` overflows `i64`.
695    pub async fn edge_history_for_entity(
696        &self,
697        entity_id: i64,
698        limit: usize,
699    ) -> Result<Vec<Edge>, MemoryError> {
700        let limit = i64::try_from(limit)?;
701        let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
702            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
703                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
704                    edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight, confidence_fast, confidence_slow, turn_index
705             FROM graph_edges
706             WHERE source_entity_id = ? OR target_entity_id = ?
707             ORDER BY valid_from DESC
708             LIMIT ?"
709        ))
710        .bind(entity_id)
711        .bind(entity_id)
712        .bind(limit)
713        .fetch_all(&self.pool)
714        .await?;
715        Ok(rows.into_iter().map(edge_from_row).collect())
716    }
717
718    /// Get all active edges between two entities (both directions).
719    ///
720    /// # Errors
721    ///
722    /// Returns an error if the database query fails.
723    pub async fn edges_between(
724        &self,
725        entity_a: i64,
726        entity_b: i64,
727    ) -> Result<Vec<Edge>, MemoryError> {
728        let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
729            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
730                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
731                    edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight, confidence_fast, confidence_slow, turn_index
732             FROM graph_edges
733             WHERE valid_to IS NULL
734               AND ((source_entity_id = ? AND target_entity_id = ?)
735                 OR (source_entity_id = ? AND target_entity_id = ?))"
736        ))
737        .bind(entity_a)
738        .bind(entity_b)
739        .bind(entity_b)
740        .bind(entity_a)
741        .fetch_all(&self.pool)
742        .await?;
743        Ok(rows.into_iter().map(edge_from_row).collect())
744    }
745
746    /// Get active edges from `source` to `target` in the exact direction (no reverse).
747    ///
748    /// # Errors
749    ///
750    /// Returns an error if the database query fails.
751    pub async fn edges_exact(
752        &self,
753        source_entity_id: i64,
754        target_entity_id: i64,
755    ) -> Result<Vec<Edge>, MemoryError> {
756        let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
757            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
758                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
759                    edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight, confidence_fast, confidence_slow, turn_index
760             FROM graph_edges
761             WHERE valid_to IS NULL
762               AND source_entity_id = ?
763               AND target_entity_id = ?"
764        ))
765        .bind(source_entity_id)
766        .bind(target_entity_id)
767        .fetch_all(&self.pool)
768        .await?;
769        Ok(rows.into_iter().map(edge_from_row).collect())
770    }
771
772    /// Count active (non-invalidated) edges.
773    ///
774    /// # Errors
775    ///
776    /// Returns an error if the database query fails.
777    pub async fn active_edge_count(&self) -> Result<i64, MemoryError> {
778        let count: i64 = zeph_db::query_scalar(sql!(
779            "SELECT COUNT(*) FROM graph_edges WHERE valid_to IS NULL"
780        ))
781        .fetch_one(&self.pool)
782        .await?;
783        Ok(count)
784    }
785
786    /// Return per-type active edge counts as `(edge_type, count)` pairs.
787    ///
788    /// # Errors
789    ///
790    /// Returns an error if the database query fails.
791    pub async fn edge_type_distribution(&self) -> Result<Vec<(String, i64)>, MemoryError> {
792        let rows: Vec<(String, i64)> = zeph_db::query_as(
793            sql!("SELECT edge_type, COUNT(*) FROM graph_edges WHERE valid_to IS NULL GROUP BY edge_type ORDER BY edge_type"),
794        )
795        .fetch_all(&self.pool)
796        .await?;
797        Ok(rows)
798    }
799
800    // ── Communities ───────────────────────────────────────────────────────────
801
802    /// Insert or update a community by name.
803    ///
804    /// `fingerprint` is a BLAKE3 hex string computed from sorted entity IDs and
805    /// intra-community edge IDs. Pass `None` to leave the fingerprint unchanged (e.g. when
806    /// `assign_to_community` adds an entity without a full re-detection pass).
807    ///
808    /// # Errors
809    ///
810    /// Returns an error if the database query fails or JSON serialization fails.
811    pub async fn upsert_community(
812        &self,
813        name: &str,
814        summary: &str,
815        entity_ids: &[i64],
816        fingerprint: Option<&str>,
817    ) -> Result<i64, MemoryError> {
818        let entity_ids_json = serde_json::to_string(entity_ids)?;
819        let id: i64 = zeph_db::query_scalar(sql!(
820            "INSERT INTO graph_communities (name, summary, entity_ids, fingerprint)
821             VALUES (?, ?, ?, ?)
822             ON CONFLICT(name) DO UPDATE SET
823               summary = excluded.summary,
824               entity_ids = excluded.entity_ids,
825               fingerprint = COALESCE(excluded.fingerprint, fingerprint),
826               updated_at = CURRENT_TIMESTAMP
827             RETURNING id"
828        ))
829        .bind(name)
830        .bind(summary)
831        .bind(entity_ids_json)
832        .bind(fingerprint)
833        .fetch_one(&self.pool)
834        .await?;
835        Ok(id)
836    }
837
838    /// Return a map of `fingerprint -> community_id` for all communities with a non-NULL
839    /// fingerprint. Used by `detect_communities` to skip unchanged partitions.
840    ///
841    /// # Errors
842    ///
843    /// Returns an error if the database query fails.
844    pub async fn community_fingerprints(&self) -> Result<HashMap<String, i64>, MemoryError> {
845        let rows: Vec<(String, i64)> = zeph_db::query_as(sql!(
846            "SELECT fingerprint, id FROM graph_communities WHERE fingerprint IS NOT NULL"
847        ))
848        .fetch_all(&self.pool)
849        .await?;
850        Ok(rows.into_iter().collect())
851    }
852
853    /// Delete a single community by its primary key.
854    ///
855    /// # Errors
856    ///
857    /// Returns an error if the database query fails.
858    pub async fn delete_community_by_id(&self, id: i64) -> Result<(), MemoryError> {
859        zeph_db::query(sql!("DELETE FROM graph_communities WHERE id = ?"))
860            .bind(id)
861            .execute(&self.pool)
862            .await?;
863        Ok(())
864    }
865
866    /// Set the fingerprint of a community to `NULL`, invalidating the incremental cache.
867    ///
868    /// Used by `assign_to_community` when an entity is added without a full re-detection pass,
869    /// ensuring the next `detect_communities` run re-summarizes the affected community.
870    ///
871    /// # Errors
872    ///
873    /// Returns an error if the database query fails.
874    pub async fn clear_community_fingerprint(&self, id: i64) -> Result<(), MemoryError> {
875        zeph_db::query(sql!(
876            "UPDATE graph_communities SET fingerprint = NULL WHERE id = ?"
877        ))
878        .bind(id)
879        .execute(&self.pool)
880        .await?;
881        Ok(())
882    }
883
884    /// Find the first community that contains the given `entity_id`.
885    ///
886    /// Uses `json_each()` to push the membership search into `SQLite`, avoiding a full
887    /// table scan with per-row JSON parsing.
888    ///
889    /// # Errors
890    ///
891    /// Returns an error if the database query fails or JSON parsing fails.
892    pub async fn community_for_entity(
893        &self,
894        entity_id: i64,
895    ) -> Result<Option<Community>, MemoryError> {
896        let row: Option<CommunityRow> = zeph_db::query_as(
897            sql!("SELECT c.id, c.name, c.summary, c.entity_ids, c.fingerprint, c.created_at, c.updated_at
898             FROM graph_communities c, json_each(c.entity_ids) j
899             WHERE CAST(j.value AS INTEGER) = ?
900             LIMIT 1"),
901        )
902        .bind(entity_id)
903        .fetch_optional(&self.pool)
904        .await?;
905        match row {
906            Some(row) => {
907                let raw_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
908                let entity_ids = raw_ids.into_iter().map(EntityId).collect();
909                Ok(Some(Community {
910                    id: row.id,
911                    name: row.name,
912                    summary: row.summary,
913                    entity_ids,
914                    fingerprint: row.fingerprint,
915                    created_at: row.created_at,
916                    updated_at: row.updated_at,
917                }))
918            }
919            None => Ok(None),
920        }
921    }
922
923    /// Get all communities.
924    ///
925    /// # Errors
926    ///
927    /// Returns an error if the database query fails or JSON parsing fails.
928    pub async fn all_communities(&self) -> Result<Vec<Community>, MemoryError> {
929        let rows: Vec<CommunityRow> = zeph_db::query_as(sql!(
930            "SELECT id, name, summary, entity_ids, fingerprint, created_at, updated_at
931             FROM graph_communities
932             ORDER BY id ASC"
933        ))
934        .fetch_all(&self.pool)
935        .await?;
936
937        rows.into_iter()
938            .map(|row| {
939                let raw_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
940                let entity_ids = raw_ids.into_iter().map(EntityId).collect();
941                Ok(Community {
942                    id: row.id,
943                    name: row.name,
944                    summary: row.summary,
945                    entity_ids,
946                    fingerprint: row.fingerprint,
947                    created_at: row.created_at,
948                    updated_at: row.updated_at,
949                })
950            })
951            .collect()
952    }
953
954    /// Count the total number of communities.
955    ///
956    /// # Errors
957    ///
958    /// Returns an error if the database query fails.
959    pub async fn community_count(&self) -> Result<i64, MemoryError> {
960        let count: i64 = zeph_db::query_scalar(sql!("SELECT COUNT(*) FROM graph_communities"))
961            .fetch_one(&self.pool)
962            .await?;
963        Ok(count)
964    }
965
966    // ── Metadata ──────────────────────────────────────────────────────────────
967
968    /// Get a metadata value by key.
969    ///
970    /// # Errors
971    ///
972    /// Returns an error if the database query fails.
973    pub async fn get_metadata(&self, key: &str) -> Result<Option<String>, MemoryError> {
974        let val: Option<String> =
975            zeph_db::query_scalar(sql!("SELECT value FROM graph_metadata WHERE key = ?"))
976                .bind(key)
977                .fetch_optional(&self.pool)
978                .await?;
979        Ok(val)
980    }
981
982    /// Set a metadata value by key (upsert).
983    ///
984    /// # Errors
985    ///
986    /// Returns an error if the database query fails.
987    pub async fn set_metadata(&self, key: &str, value: &str) -> Result<(), MemoryError> {
988        zeph_db::query(sql!(
989            "INSERT INTO graph_metadata (key, value) VALUES (?, ?)
990             ON CONFLICT(key) DO UPDATE SET value = excluded.value"
991        ))
992        .bind(key)
993        .bind(value)
994        .execute(&self.pool)
995        .await?;
996        Ok(())
997    }
998
999    /// Get the current extraction count from metadata.
1000    ///
1001    /// Returns 0 if the counter has not been initialized.
1002    ///
1003    /// # Errors
1004    ///
1005    /// Returns an error if the database query fails.
1006    pub async fn extraction_count(&self) -> Result<i64, MemoryError> {
1007        let val = self.get_metadata("extraction_count").await?;
1008        Ok(val.and_then(|v| v.parse::<i64>().ok()).unwrap_or(0))
1009    }
1010
1011    /// Stream all active (non-invalidated) edges.
1012    pub fn all_active_edges_stream(&self) -> impl Stream<Item = Result<Edge, MemoryError>> + '_ {
1013        use futures::StreamExt as _;
1014        zeph_db::query_as::<_, EdgeRow>(sql!(
1015            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1016                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1017                    edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight, confidence_fast, confidence_slow, turn_index
1018             FROM graph_edges
1019             WHERE valid_to IS NULL
1020             ORDER BY id ASC"
1021        ))
1022        .fetch(&self.pool)
1023        .map(|r| r.map_err(MemoryError::from).map(edge_from_row))
1024    }
1025
1026    /// Fetch a chunk of active edges using keyset pagination.
1027    ///
1028    /// Returns edges with `id > after_id` in ascending order, up to `limit` rows.
1029    /// Starting with `after_id = 0` returns the first chunk. Pass the last `id` from
1030    /// the returned chunk as `after_id` for the next page. An empty result means all
1031    /// edges have been consumed.
1032    ///
1033    /// Keyset pagination is O(1) per page (index seek on `id`) vs OFFSET which is O(N).
1034    /// It is also stable under concurrent inserts: new edges get monotonically higher IDs
1035    /// and will appear in subsequent chunks or after the last chunk, never causing
1036    /// duplicates. Concurrent invalidations (setting `valid_to`) may cause a single edge
1037    /// to be skipped, which is acceptable — LPA operates on an eventual-consistency snapshot.
1038    ///
1039    /// # Errors
1040    ///
1041    /// Returns an error if the database query fails.
1042    pub async fn edges_after_id(
1043        &self,
1044        after_id: i64,
1045        limit: i64,
1046    ) -> Result<Vec<Edge>, MemoryError> {
1047        let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
1048            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1049                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1050                    edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight, confidence_fast, confidence_slow, turn_index
1051             FROM graph_edges
1052             WHERE valid_to IS NULL AND id > ?
1053             ORDER BY id ASC
1054             LIMIT ?"
1055        ))
1056        .bind(after_id)
1057        .bind(limit)
1058        .fetch_all(&self.pool)
1059        .await?;
1060        Ok(rows.into_iter().map(edge_from_row).collect())
1061    }
1062
1063    /// Find a community by its primary key.
1064    ///
1065    /// # Errors
1066    ///
1067    /// Returns an error if the database query fails or JSON parsing fails.
1068    pub async fn find_community_by_id(&self, id: i64) -> Result<Option<Community>, MemoryError> {
1069        let row: Option<CommunityRow> = zeph_db::query_as(sql!(
1070            "SELECT id, name, summary, entity_ids, fingerprint, created_at, updated_at
1071             FROM graph_communities
1072             WHERE id = ?"
1073        ))
1074        .bind(id)
1075        .fetch_optional(&self.pool)
1076        .await?;
1077        match row {
1078            Some(row) => {
1079                let raw_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
1080                let entity_ids = raw_ids.into_iter().map(EntityId).collect();
1081                Ok(Some(Community {
1082                    id: row.id,
1083                    name: row.name,
1084                    summary: row.summary,
1085                    entity_ids,
1086                    fingerprint: row.fingerprint,
1087                    created_at: row.created_at,
1088                    updated_at: row.updated_at,
1089                }))
1090            }
1091            None => Ok(None),
1092        }
1093    }
1094
1095    /// Delete all communities (full rebuild before upsert).
1096    ///
1097    /// # Errors
1098    ///
1099    /// Returns an error if the database query fails.
1100    pub async fn delete_all_communities(&self) -> Result<(), MemoryError> {
1101        zeph_db::query(sql!("DELETE FROM graph_communities"))
1102            .execute(&self.pool)
1103            .await?;
1104        Ok(())
1105    }
1106
1107    // ── A-MEM Retrieval Tracking ──────────────────────────────────────────────
1108
1109    /// Find entities matching `query` and return them with normalized FTS5 scores.
1110    ///
1111    /// Returns `Vec<(Entity, fts_score)>` where `fts_score` is normalized to `[0.0, 1.0]`
1112    /// by dividing each negated BM25 value by the maximum in the result set.
1113    /// Alias matches receive a fixed score of `0.5` (relative to FTS matches before normalization).
1114    ///
1115    /// Uses `UNION ALL` with outer `ORDER BY` to preserve FTS5 ordering through the LIMIT.
1116    ///
1117    /// # Errors
1118    ///
1119    /// Returns an error if the database query fails.
1120    pub async fn find_entities_ranked(
1121        &self,
1122        query: &str,
1123        limit: usize,
1124    ) -> Result<Vec<(Entity, f32)>, MemoryError> {
1125        let query = &query[..query.floor_char_boundary(512)];
1126        let Some(fts_query) = build_fts_query(query) else {
1127            return Ok(vec![]);
1128        };
1129
1130        let limit_i64 = i64::try_from(limit)?;
1131        let ranked_fts_sql = build_ranked_fts_sql();
1132        let rows: Vec<EntityFtsRow> = zeph_db::query_as(&ranked_fts_sql)
1133            .bind(&fts_query)
1134            .bind(format!(
1135                "%{}%",
1136                query
1137                    .trim()
1138                    .replace('\\', "\\\\")
1139                    .replace('%', "\\%")
1140                    .replace('_', "\\_")
1141            ))
1142            .bind(limit_i64)
1143            .fetch_all(&self.pool)
1144            .await?;
1145
1146        if rows.is_empty() {
1147            return Ok(vec![]);
1148        }
1149
1150        Ok(normalize_and_dedup(rows))
1151    }
1152
1153    /// Compute structural scores (degree + edge type diversity) for a batch of entity IDs.
1154    ///
1155    /// Returns `HashMap<entity_id, structural_score>` where score is in `[0.0, 1.0]`.
1156    /// Formula: `0.6 * (degree / max_degree) + 0.4 * (type_diversity / 4.0)`.
1157    /// Entities with no edges receive score `0.0`.
1158    ///
1159    /// # Errors
1160    ///
1161    /// Returns an error if the database query fails.
1162    pub async fn entity_structural_scores(
1163        &self,
1164        entity_ids: &[i64],
1165    ) -> Result<HashMap<i64, f32>, MemoryError> {
1166        // Each query binds entity_ids three times (three IN clauses).
1167        // Stay safely under SQLite 999-variable limit: 999 / 3 = 333, use 163 for headroom.
1168        const MAX_BATCH: usize = 163;
1169
1170        if entity_ids.is_empty() {
1171            return Ok(HashMap::new());
1172        }
1173
1174        let mut all_rows: Vec<(i64, i64, i64)> = Vec::new();
1175        for chunk in entity_ids.chunks(MAX_BATCH) {
1176            let n = chunk.len();
1177            // Three copies of chunk IDs: positions 1..n, n+1..2n, 2n+1..3n
1178            let ph1 = placeholder_list(1, n);
1179            let ph2 = placeholder_list(n + 1, n);
1180            let ph3 = placeholder_list(n * 2 + 1, n);
1181
1182            // Build query: count degree and distinct edge types for each entity.
1183            let sql = format!(
1184                "SELECT entity_id,
1185                        COUNT(*) AS degree,
1186                        COUNT(DISTINCT edge_type) AS type_diversity
1187                 FROM (
1188                     SELECT source_entity_id AS entity_id, edge_type
1189                     FROM graph_edges
1190                     WHERE valid_to IS NULL AND source_entity_id IN ({ph1})
1191                     UNION ALL
1192                     SELECT target_entity_id AS entity_id, edge_type
1193                     FROM graph_edges
1194                     WHERE valid_to IS NULL AND target_entity_id IN ({ph2})
1195                 )
1196                 WHERE entity_id IN ({ph3})
1197                 GROUP BY entity_id"
1198            );
1199
1200            let mut query = zeph_db::query_as::<_, (i64, i64, i64)>(&sql);
1201            // Bind chunk three times (three IN clauses)
1202            for id in chunk {
1203                query = query.bind(*id);
1204            }
1205            for id in chunk {
1206                query = query.bind(*id);
1207            }
1208            for id in chunk {
1209                query = query.bind(*id);
1210            }
1211
1212            let chunk_rows: Vec<(i64, i64, i64)> = query.fetch_all(&self.pool).await?;
1213            all_rows.extend(chunk_rows);
1214        }
1215
1216        if all_rows.is_empty() {
1217            return Ok(entity_ids.iter().map(|&id| (id, 0.0_f32)).collect());
1218        }
1219
1220        let max_degree = all_rows
1221            .iter()
1222            .map(|(_, d, _)| *d)
1223            .max()
1224            .unwrap_or(1)
1225            .max(1);
1226
1227        let mut scores: HashMap<i64, f32> = entity_ids.iter().map(|&id| (id, 0.0_f32)).collect();
1228        for (entity_id, degree, type_diversity) in all_rows {
1229            #[allow(clippy::cast_precision_loss)]
1230            let norm_degree = degree as f32 / max_degree as f32;
1231            #[allow(clippy::cast_precision_loss)]
1232            let norm_diversity = (type_diversity as f32 / 4.0).min(1.0);
1233            let score = 0.6 * norm_degree + 0.4 * norm_diversity;
1234            scores.insert(entity_id, score);
1235        }
1236
1237        Ok(scores)
1238    }
1239
1240    /// Look up community IDs for a batch of entity IDs.
1241    ///
1242    /// Returns `HashMap<entity_id, community_id>`. Entities not assigned to any community
1243    /// are absent from the map (treated as `None` by callers — no community cap applied).
1244    ///
1245    /// # Errors
1246    ///
1247    /// Returns an error if the database query fails.
1248    #[cfg(any(feature = "sqlite", feature = "postgres"))]
1249    pub async fn entity_community_ids(
1250        &self,
1251        entity_ids: &[i64],
1252    ) -> Result<HashMap<i64, i64>, MemoryError> {
1253        const MAX_BATCH: usize = 490;
1254
1255        if entity_ids.is_empty() {
1256            return Ok(HashMap::new());
1257        }
1258
1259        let mut result: HashMap<i64, i64> = HashMap::new();
1260        for chunk in entity_ids.chunks(MAX_BATCH) {
1261            let placeholders = placeholder_list(1, chunk.len());
1262
1263            let community_sql = community_ids_sql(&placeholders);
1264            let mut query = zeph_db::query_as::<_, (i64, i64)>(&community_sql);
1265            for id in chunk {
1266                query = query.bind(*id);
1267            }
1268
1269            let rows: Vec<(i64, i64)> = query.fetch_all(&self.pool).await?;
1270            result.extend(rows);
1271        }
1272
1273        Ok(result)
1274    }
1275
1276    /// Increment `retrieval_count` and set `last_retrieved_at` for a batch of edge IDs.
1277    ///
1278    /// Fire-and-forget: errors are logged but not propagated. Caller should log the warning.
1279    /// Batched with `MAX_BATCH = 490` to stay safely under `SQLite` bind variable limit.
1280    ///
1281    /// # Errors
1282    ///
1283    /// Returns an error if the database query fails.
1284    pub async fn record_edge_retrieval(&self, edge_ids: &[i64]) -> Result<(), MemoryError> {
1285        const MAX_BATCH: usize = 490;
1286        let epoch_now = <ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1287        for chunk in edge_ids.chunks(MAX_BATCH) {
1288            let edge_placeholders = placeholder_list(1, chunk.len());
1289            let retrieval_sql = format!(
1290                "UPDATE graph_edges \
1291                 SET retrieval_count = retrieval_count + 1, \
1292                     last_retrieved_at = {epoch_now} \
1293                 WHERE id IN ({edge_placeholders})"
1294            );
1295            let mut q = zeph_db::query(&retrieval_sql);
1296            for id in chunk {
1297                q = q.bind(*id);
1298            }
1299            q.execute(&self.pool).await?;
1300        }
1301        Ok(())
1302    }
1303
1304    /// Increment `weight` on the set of edges traversed during the current recall (HL-F2, #3344).
1305    ///
1306    /// Mirrors [`Self::record_edge_retrieval`] in shape: same `MAX_BATCH = 490` chunking,
1307    /// same `WHERE id IN (…) AND valid_to IS NULL` filter (defensive — traversed edges should
1308    /// already be active, but this prevents reinforcing tombstoned edges).
1309    ///
1310    /// No-op when `edge_ids` is empty or `delta == 0.0`.
1311    ///
1312    /// # Errors
1313    ///
1314    /// Returns an error if the underlying `UPDATE` fails.
1315    #[tracing::instrument(
1316        name = "memory.graph.hebbian_increment",
1317        skip_all,
1318        fields(edge_count = edge_ids.len())
1319    )]
1320    pub async fn apply_hebbian_increment(
1321        &self,
1322        edge_ids: &[i64],
1323        delta: f32,
1324    ) -> Result<(), MemoryError> {
1325        // MAX_BATCH chosen to stay under SQLite's 999 host-parameter limit
1326        // ($1 = delta, $2..$N+1 = edge ids — 1 + 490 = 491 params per chunk).
1327        const MAX_BATCH: usize = 490;
1328        if edge_ids.is_empty() || delta == 0.0 {
1329            return Ok(());
1330        }
1331        for chunk in edge_ids.chunks(MAX_BATCH) {
1332            let edge_placeholders = placeholder_list(2, chunk.len());
1333            let sql = format!(
1334                "UPDATE graph_edges \
1335                 SET weight = weight + $1 \
1336                 WHERE id IN ({edge_placeholders}) \
1337                   AND valid_to IS NULL"
1338            );
1339            let mut q = zeph_db::query(&sql);
1340            q = q.bind(f64::from(delta));
1341            for id in chunk {
1342                q = q.bind(*id);
1343            }
1344            q.execute(&self.pool).await?;
1345        }
1346        Ok(())
1347    }
1348
1349    /// Return the subset of `ids` that exist in `graph_entities`.
1350    ///
1351    /// Useful for cross-referencing Qdrant-side entity IDs against the `SQLite` truth.
1352    /// Processes in chunks of 490 to stay under the `SQLite` variable limit (~32 k).
1353    ///
1354    /// # Errors
1355    ///
1356    /// Returns an error if the database query fails.
1357    pub async fn entity_ids_in(&self, ids: &[i64]) -> Result<Vec<i64>, MemoryError> {
1358        const MAX_BATCH: usize = 490;
1359        if ids.is_empty() {
1360            return Ok(Vec::new());
1361        }
1362        // TODO: chunk when ids.len() > 5_000 to guard against extreme cases
1363        let mut result = Vec::with_capacity(ids.len());
1364        for chunk in ids.chunks(MAX_BATCH) {
1365            let placeholders = zeph_db::placeholder_list(1, chunk.len());
1366            let sql = format!("SELECT id FROM graph_entities WHERE id IN ({placeholders})");
1367            let mut q = zeph_db::query_as::<_, (i64,)>(&sql);
1368            for id in chunk {
1369                q = q.bind(*id);
1370            }
1371            let rows = q.fetch_all(&self.pool).await?;
1372            for (id,) in rows {
1373                result.push(id);
1374            }
1375        }
1376        Ok(result)
1377    }
1378
1379    /// Resolve entity IDs to their Qdrant point IDs in a single batched `SELECT` (HL-F5, #3346).
1380    ///
1381    /// Entities without a `qdrant_point_id` are silently omitted from the result.
1382    ///
1383    /// # Errors
1384    ///
1385    /// Returns an error if the database query fails.
1386    pub async fn qdrant_point_ids_for_entities(
1387        &self,
1388        entity_ids: &[i64],
1389    ) -> Result<HashMap<i64, String>, MemoryError> {
1390        // Chunk to stay under SQLite variable limit.
1391        const MAX_BATCH: usize = 490;
1392        if entity_ids.is_empty() {
1393            return Ok(HashMap::new());
1394        }
1395        let mut result: HashMap<i64, String> = HashMap::with_capacity(entity_ids.len());
1396        for chunk in entity_ids.chunks(MAX_BATCH) {
1397            let placeholders = zeph_db::placeholder_list(1, chunk.len());
1398            let sql = format!(
1399                "SELECT id, qdrant_point_id \
1400                 FROM graph_entities \
1401                 WHERE id IN ({placeholders}) \
1402                   AND qdrant_point_id IS NOT NULL"
1403            );
1404            let mut q = zeph_db::query_as::<_, (i64, String)>(&sql);
1405            for id in chunk {
1406                q = q.bind(*id);
1407            }
1408            let rows = q.fetch_all(&self.pool).await?;
1409            for (entity_id, point_id) in rows {
1410                result.insert(entity_id, point_id);
1411            }
1412        }
1413        Ok(result)
1414    }
1415
1416    /// Apply multiplicative decay to `retrieval_count` for un-retrieved active edges.
1417    ///
1418    /// Only edges with `retrieval_count > 0` and `last_retrieved_at < (now - interval_secs)`
1419    /// are updated. Returns the number of rows affected.
1420    ///
1421    /// # Errors
1422    ///
1423    /// Returns an error if the database query fails.
1424    pub async fn decay_edge_retrieval_counts(
1425        &self,
1426        decay_lambda: f64,
1427        interval_secs: u64,
1428    ) -> Result<usize, MemoryError> {
1429        let epoch_now_decay = <ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1430        let decay_raw = format!(
1431            "UPDATE graph_edges \
1432             SET retrieval_count = MAX(CAST(retrieval_count * ? AS INTEGER), 0) \
1433             WHERE valid_to IS NULL \
1434               AND retrieval_count > 0 \
1435               AND (last_retrieved_at IS NULL OR last_retrieved_at < {epoch_now_decay} - ?)"
1436        );
1437        let decay_sql = zeph_db::rewrite_placeholders(&decay_raw);
1438        let result = zeph_db::query(&decay_sql)
1439            .bind(decay_lambda)
1440            .bind(i64::try_from(interval_secs).unwrap_or(i64::MAX))
1441            .execute(&self.pool)
1442            .await?;
1443        Ok(usize::try_from(result.rows_affected())?)
1444    }
1445
1446    /// Delete expired edges older than `retention_days` and return count deleted.
1447    ///
1448    /// # Errors
1449    ///
1450    /// Returns an error if the database query fails.
1451    pub async fn delete_expired_edges(&self, retention_days: u32) -> Result<usize, MemoryError> {
1452        let days = i64::from(retention_days);
1453        let result = zeph_db::query(sql!(
1454            "DELETE FROM graph_edges
1455             WHERE expired_at IS NOT NULL
1456               AND expired_at < datetime('now', '-' || ? || ' days')"
1457        ))
1458        .bind(days)
1459        .execute(&self.pool)
1460        .await?;
1461        Ok(usize::try_from(result.rows_affected())?)
1462    }
1463
1464    /// Delete orphan entities (no active edges, last seen more than `retention_days` ago).
1465    ///
1466    /// # Errors
1467    ///
1468    /// Returns an error if the database query fails.
1469    pub async fn delete_orphan_entities(&self, retention_days: u32) -> Result<usize, MemoryError> {
1470        let days = i64::from(retention_days);
1471        let result = zeph_db::query(sql!(
1472            "DELETE FROM graph_entities
1473             WHERE id NOT IN (
1474                 SELECT DISTINCT source_entity_id FROM graph_edges WHERE valid_to IS NULL
1475                 UNION
1476                 SELECT DISTINCT target_entity_id FROM graph_edges WHERE valid_to IS NULL
1477             )
1478             AND last_seen_at < datetime('now', '-' || ? || ' days')"
1479        ))
1480        .bind(days)
1481        .execute(&self.pool)
1482        .await?;
1483        Ok(usize::try_from(result.rows_affected())?)
1484    }
1485
1486    /// Delete the oldest excess entities when count exceeds `max_entities`.
1487    ///
1488    /// Entities are ranked by ascending edge count, then ascending `last_seen_at` (LRU).
1489    /// Only deletes when `entity_count() > max_entities`.
1490    ///
1491    /// # Errors
1492    ///
1493    /// Returns an error if the database query fails.
1494    pub async fn cap_entities(&self, max_entities: usize) -> Result<usize, MemoryError> {
1495        let current = self.entity_count().await?;
1496        let max = i64::try_from(max_entities)?;
1497        if current <= max {
1498            return Ok(0);
1499        }
1500        let excess = current - max;
1501        let result = zeph_db::query(sql!(
1502            "DELETE FROM graph_entities
1503             WHERE id IN (
1504                 SELECT e.id
1505                 FROM graph_entities e
1506                 LEFT JOIN (
1507                     SELECT source_entity_id AS eid, COUNT(*) AS cnt
1508                     FROM graph_edges WHERE valid_to IS NULL GROUP BY source_entity_id
1509                     UNION ALL
1510                     SELECT target_entity_id AS eid, COUNT(*) AS cnt
1511                     FROM graph_edges WHERE valid_to IS NULL GROUP BY target_entity_id
1512                 ) edge_counts ON e.id = edge_counts.eid
1513                 ORDER BY COALESCE(edge_counts.cnt, 0) ASC, e.last_seen_at ASC
1514                 LIMIT ?
1515             )"
1516        ))
1517        .bind(excess)
1518        .execute(&self.pool)
1519        .await?;
1520        Ok(usize::try_from(result.rows_affected())?)
1521    }
1522
1523    // ── Temporal Edge Queries ─────────────────────────────────────────────────
1524
1525    /// Return all edges for `entity_id` (as source or target) that were valid at `timestamp`.
1526    ///
1527    /// An edge is valid at `timestamp` when:
1528    /// - `valid_from <= timestamp`, AND
1529    /// - `valid_to IS NULL` (open-ended) OR `valid_to > timestamp`.
1530    ///
1531    /// `timestamp` must be a `SQLite` datetime string: `"YYYY-MM-DD HH:MM:SS"`.
1532    ///
1533    /// # Errors
1534    ///
1535    /// Returns an error if the database query fails.
1536    pub async fn edges_at_timestamp(
1537        &self,
1538        entity_id: i64,
1539        timestamp: &str,
1540    ) -> Result<Vec<Edge>, MemoryError> {
1541        // Split into two UNIONed branches to leverage the partial indexes from migration 030:
1542        //   Branch 1 (active edges):     idx_graph_edges_valid + idx_graph_edges_source/target
1543        //   Branch 2 (historical edges): idx_graph_edges_src_temporal / idx_graph_edges_tgt_temporal
1544        let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
1545            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1546                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1547                    edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight, confidence_fast, confidence_slow, turn_index
1548             FROM graph_edges
1549             WHERE valid_to IS NULL
1550               AND valid_from <= ?
1551               AND (source_entity_id = ? OR target_entity_id = ?)
1552             UNION ALL
1553             SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1554                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1555                    edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight, confidence_fast, confidence_slow, turn_index
1556             FROM graph_edges
1557             WHERE valid_to IS NOT NULL
1558               AND valid_from <= ?
1559               AND valid_to > ?
1560               AND (source_entity_id = ? OR target_entity_id = ?)"
1561        ))
1562        .bind(timestamp)
1563        .bind(entity_id)
1564        .bind(entity_id)
1565        .bind(timestamp)
1566        .bind(timestamp)
1567        .bind(entity_id)
1568        .bind(entity_id)
1569        .fetch_all(&self.pool)
1570        .await?;
1571        Ok(rows.into_iter().map(edge_from_row).collect())
1572    }
1573
1574    /// Return all edge versions (active and expired) for the given `(source, predicate)` pair.
1575    ///
1576    /// The optional `relation` filter restricts results to a specific relation label.
1577    /// Results are ordered by `valid_from DESC` (most recent first).
1578    ///
1579    /// # Errors
1580    ///
1581    /// Returns an error if the database query fails.
1582    pub async fn edge_history(
1583        &self,
1584        source_entity_id: i64,
1585        predicate: &str,
1586        relation: Option<&str>,
1587        limit: usize,
1588    ) -> Result<Vec<Edge>, MemoryError> {
1589        // Escape LIKE wildcards so `%` and `_` in the predicate are treated as literals.
1590        let escaped = predicate
1591            .replace('\\', "\\\\")
1592            .replace('%', "\\%")
1593            .replace('_', "\\_");
1594        let like_pattern = format!("%{escaped}%");
1595        let limit = i64::try_from(limit)?;
1596        let rows: Vec<EdgeRow> = if let Some(rel) = relation {
1597            zeph_db::query_as(sql!(
1598                "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1599                        valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1600                        edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight, confidence_fast, confidence_slow, turn_index
1601                 FROM graph_edges
1602                 WHERE source_entity_id = ?
1603                   AND fact LIKE ? ESCAPE '\\'
1604                   AND relation = ?
1605                 ORDER BY valid_from DESC
1606                 LIMIT ?"
1607            ))
1608            .bind(source_entity_id)
1609            .bind(&like_pattern)
1610            .bind(rel)
1611            .bind(limit)
1612            .fetch_all(&self.pool)
1613            .await?
1614        } else {
1615            zeph_db::query_as(sql!(
1616                "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1617                        valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1618                        edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight, confidence_fast, confidence_slow, turn_index
1619                 FROM graph_edges
1620                 WHERE source_entity_id = ?
1621                   AND fact LIKE ? ESCAPE '\\'
1622                 ORDER BY valid_from DESC
1623                 LIMIT ?"
1624            ))
1625            .bind(source_entity_id)
1626            .bind(&like_pattern)
1627            .bind(limit)
1628            .fetch_all(&self.pool)
1629            .await?
1630        };
1631        Ok(rows.into_iter().map(edge_from_row).collect())
1632    }
1633
1634    // ── BFS Traversal ─────────────────────────────────────────────────────────
1635
1636    /// Breadth-first traversal from `start_entity_id` up to `max_hops` hops.
1637    ///
1638    /// Returns all reachable entities and the active edges connecting them.
1639    /// Implements BFS iteratively in Rust to guarantee cycle safety regardless
1640    /// of `SQLite` CTE limitations.
1641    ///
1642    /// **`SQLite` bind parameter limit**: each BFS hop binds the frontier IDs three times in the
1643    /// neighbour query. At ~300+ frontier entities per hop, the IN clause may approach `SQLite`'s
1644    /// default `SQLITE_MAX_VARIABLE_NUMBER` limit of 999. Acceptable for Phase 1 (small graphs,
1645    /// `max_hops` typically 2–3). For large graphs, consider batching or a temp-table approach.
1646    ///
1647    /// # Errors
1648    ///
1649    /// Returns an error if any database query fails.
1650    pub async fn bfs(
1651        &self,
1652        start_entity_id: i64,
1653        max_hops: u32,
1654    ) -> Result<(Vec<Entity>, Vec<Edge>), MemoryError> {
1655        self.bfs_with_depth(start_entity_id, max_hops)
1656            .await
1657            .map(|(e, ed, _)| (e, ed))
1658    }
1659
1660    /// BFS traversal returning entities, edges, and a depth map (`entity_id` → hop distance).
1661    ///
1662    /// The depth map records the minimum hop distance from `start_entity_id` to each visited
1663    /// entity. The start entity itself has depth 0.
1664    ///
1665    /// **`SQLite` bind parameter limit**: see [`Self::bfs`] for notes on frontier size limits.
1666    ///
1667    /// # Errors
1668    ///
1669    /// Returns an error if any database query fails.
1670    pub async fn bfs_with_depth(
1671        &self,
1672        start_entity_id: i64,
1673        max_hops: u32,
1674    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1675        self.bfs_core(start_entity_id, max_hops, None).await
1676    }
1677
1678    /// BFS traversal considering only edges that were valid at `timestamp`.
1679    ///
1680    /// Equivalent to [`Self::bfs_with_depth`] but replaces the `valid_to IS NULL` filter with
1681    /// the temporal range predicate `valid_from <= ts AND (valid_to IS NULL OR valid_to > ts)`.
1682    ///
1683    /// `timestamp` must be a `SQLite` datetime string: `"YYYY-MM-DD HH:MM:SS"`.
1684    ///
1685    /// # Errors
1686    ///
1687    /// Returns an error if any database query fails.
1688    pub async fn bfs_at_timestamp(
1689        &self,
1690        start_entity_id: i64,
1691        max_hops: u32,
1692        timestamp: &str,
1693    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1694        self.bfs_core(start_entity_id, max_hops, Some(timestamp))
1695            .await
1696    }
1697
1698    /// BFS traversal scoped to specific MAGMA edge types.
1699    ///
1700    /// When `edge_types` is empty, behaves identically to [`Self::bfs_with_depth`] (traverses all
1701    /// active edges). When `edge_types` is non-empty, only traverses edges whose `edge_type`
1702    /// matches one of the provided types.
1703    ///
1704    /// This enables subgraph-scoped retrieval: a causal query traverses only causal + semantic
1705    /// edges, a temporal query only temporal + semantic edges, etc.
1706    ///
1707    /// Note: Semantic is typically included in `edge_types` by the caller to ensure recall is
1708    /// never worse than the untyped BFS. See `classify_graph_subgraph` in `router.rs`.
1709    ///
1710    /// # Errors
1711    ///
1712    /// Returns an error if any database query fails.
1713    pub async fn bfs_typed(
1714        &self,
1715        start_entity_id: i64,
1716        max_hops: u32,
1717        edge_types: &[EdgeType],
1718    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1719        if edge_types.is_empty() {
1720            return self.bfs_with_depth(start_entity_id, max_hops).await;
1721        }
1722        self.bfs_core_typed(start_entity_id, max_hops, None, edge_types)
1723            .await
1724    }
1725
1726    /// Shared BFS implementation.
1727    ///
1728    /// When `at_timestamp` is `None`, only active edges (`valid_to IS NULL`) are traversed.
1729    /// When `at_timestamp` is `Some(ts)`, edges valid at `ts` are traversed (temporal BFS).
1730    ///
1731    /// All IDs used in dynamic SQL come from our own database — no user input reaches the
1732    /// format string, so there is no SQL injection risk.
1733    async fn bfs_core(
1734        &self,
1735        start_entity_id: i64,
1736        max_hops: u32,
1737        at_timestamp: Option<&str>,
1738    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1739        use std::collections::HashMap;
1740
1741        // SQLite binds frontier IDs 3× per hop; at >333 IDs the IN clause exceeds
1742        // SQLITE_MAX_VARIABLE_NUMBER (999). Cap to 300 to stay safely within the limit.
1743        const MAX_FRONTIER: usize = 300;
1744
1745        let mut depth_map: HashMap<i64, u32> = HashMap::new();
1746        let mut frontier: Vec<i64> = vec![start_entity_id];
1747        depth_map.insert(start_entity_id, 0);
1748
1749        for hop in 0..max_hops {
1750            if frontier.is_empty() {
1751                break;
1752            }
1753            frontier.truncate(MAX_FRONTIER);
1754            // IDs come from our own DB — no user input, no injection risk.
1755            // Three copies of frontier IDs: positions 1..n, n+1..2n, 2n+1..3n.
1756            // Timestamp (if any) follows at position 3n+1.
1757            let n = frontier.len();
1758            let ph1 = placeholder_list(1, n);
1759            let ph2 = placeholder_list(n + 1, n);
1760            let ph3 = placeholder_list(n * 2 + 1, n);
1761            let edge_filter = if at_timestamp.is_some() {
1762                let ts_pos = n * 3 + 1;
1763                format!(
1764                    "valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1765                    ts = numbered_placeholder(ts_pos),
1766                )
1767            } else {
1768                "valid_to IS NULL".to_owned()
1769            };
1770            let neighbour_sql = format!(
1771                "SELECT DISTINCT CASE
1772                     WHEN source_entity_id IN ({ph1}) THEN target_entity_id
1773                     ELSE source_entity_id
1774                 END as neighbour_id
1775                 FROM graph_edges
1776                 WHERE {edge_filter}
1777                   AND (source_entity_id IN ({ph2}) OR target_entity_id IN ({ph3}))"
1778            );
1779            let mut q = zeph_db::query_scalar::<_, i64>(&neighbour_sql);
1780            for id in &frontier {
1781                q = q.bind(*id);
1782            }
1783            for id in &frontier {
1784                q = q.bind(*id);
1785            }
1786            for id in &frontier {
1787                q = q.bind(*id);
1788            }
1789            if let Some(ts) = at_timestamp {
1790                q = q.bind(ts);
1791            }
1792            let neighbours: Vec<i64> = q.fetch_all(&self.pool).await?;
1793            let mut next_frontier: Vec<i64> = Vec::new();
1794            for nbr in neighbours {
1795                if let std::collections::hash_map::Entry::Vacant(e) = depth_map.entry(nbr) {
1796                    e.insert(hop + 1);
1797                    next_frontier.push(nbr);
1798                }
1799            }
1800            frontier = next_frontier;
1801        }
1802
1803        self.bfs_fetch_results(depth_map, at_timestamp).await
1804    }
1805
1806    /// BFS implementation scoped to specific edge types.
1807    ///
1808    /// Builds the IN clause for `edge_type` filtering dynamically from enum values.
1809    /// All enum-derived strings come from `EdgeType::as_str()` — no user input reaches SQL.
1810    ///
1811    /// # Errors
1812    ///
1813    /// Returns an error if any database query fails.
1814    async fn bfs_core_typed(
1815        &self,
1816        start_entity_id: i64,
1817        max_hops: u32,
1818        at_timestamp: Option<&str>,
1819        edge_types: &[EdgeType],
1820    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1821        use std::collections::HashMap;
1822
1823        const MAX_FRONTIER: usize = 300;
1824
1825        let type_strs: Vec<&str> = edge_types.iter().map(|t| t.as_str()).collect();
1826
1827        let mut depth_map: HashMap<i64, u32> = HashMap::new();
1828        let mut frontier: Vec<i64> = vec![start_entity_id];
1829        depth_map.insert(start_entity_id, 0);
1830
1831        let n_types = type_strs.len();
1832        // type_in is constant for the entire BFS — positions 1..=n_types never change.
1833        let type_in = placeholder_list(1, n_types);
1834        let id_start = n_types + 1;
1835
1836        for hop in 0..max_hops {
1837            if frontier.is_empty() {
1838                break;
1839            }
1840            frontier.truncate(MAX_FRONTIER);
1841
1842            let n_frontier = frontier.len();
1843            // Positions: types first (1..n_types), then 3 copies of frontier IDs.
1844            let fp1 = placeholder_list(id_start, n_frontier);
1845            let fp2 = placeholder_list(id_start + n_frontier, n_frontier);
1846            let fp3 = placeholder_list(id_start + n_frontier * 2, n_frontier);
1847
1848            let edge_filter = if at_timestamp.is_some() {
1849                let ts_pos = id_start + n_frontier * 3;
1850                format!(
1851                    "edge_type IN ({type_in}) AND valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1852                    ts = numbered_placeholder(ts_pos),
1853                )
1854            } else {
1855                format!("edge_type IN ({type_in}) AND valid_to IS NULL")
1856            };
1857
1858            let neighbour_sql = format!(
1859                "SELECT DISTINCT CASE
1860                     WHEN source_entity_id IN ({fp1}) THEN target_entity_id
1861                     ELSE source_entity_id
1862                 END as neighbour_id
1863                 FROM graph_edges
1864                 WHERE {edge_filter}
1865                   AND (source_entity_id IN ({fp2}) OR target_entity_id IN ({fp3}))"
1866            );
1867
1868            let mut q = zeph_db::query_scalar::<_, i64>(&neighbour_sql);
1869            // Bind types first
1870            for t in &type_strs {
1871                q = q.bind(*t);
1872            }
1873            // Bind frontier 3 times
1874            for id in &frontier {
1875                q = q.bind(*id);
1876            }
1877            for id in &frontier {
1878                q = q.bind(*id);
1879            }
1880            for id in &frontier {
1881                q = q.bind(*id);
1882            }
1883            if let Some(ts) = at_timestamp {
1884                q = q.bind(ts);
1885            }
1886
1887            let neighbours: Vec<i64> = q.fetch_all(&self.pool).await?;
1888            let mut next_frontier: Vec<i64> = Vec::new();
1889            for nbr in neighbours {
1890                if let std::collections::hash_map::Entry::Vacant(e) = depth_map.entry(nbr) {
1891                    e.insert(hop + 1);
1892                    next_frontier.push(nbr);
1893                }
1894            }
1895            frontier = next_frontier;
1896        }
1897
1898        // Fetch results — pass edge_type filter to bfs_fetch_results_typed
1899        self.bfs_fetch_results_typed(depth_map, at_timestamp, &type_strs)
1900            .await
1901    }
1902
1903    /// Fetch entities and typed edges for a completed BFS depth map.
1904    ///
1905    /// Filters returned edges by the provided `edge_type` strings.
1906    ///
1907    /// # Errors
1908    ///
1909    /// Returns an error if any database query fails.
1910    async fn bfs_fetch_results_typed(
1911        &self,
1912        depth_map: std::collections::HashMap<i64, u32>,
1913        at_timestamp: Option<&str>,
1914        type_strs: &[&str],
1915    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1916        let mut visited_ids: Vec<i64> = depth_map.keys().copied().collect();
1917        if visited_ids.is_empty() {
1918            return Ok((Vec::new(), Vec::new(), depth_map));
1919        }
1920        if visited_ids.len() > 499 {
1921            tracing::warn!(
1922                total = visited_ids.len(),
1923                retained = 499,
1924                "bfs_fetch_results_typed: visited entity set truncated to 499"
1925            );
1926            visited_ids.truncate(499);
1927        }
1928
1929        let n_types = type_strs.len();
1930        let n_visited = visited_ids.len();
1931
1932        // Bind order: types first (1..=n_types), then visited_ids twice, then optional timestamp.
1933        let type_in = placeholder_list(1, n_types);
1934        let id_start = n_types + 1;
1935        let ph_ids1 = placeholder_list(id_start, n_visited);
1936        let ph_ids2 = placeholder_list(id_start + n_visited, n_visited);
1937
1938        let edge_filter = if at_timestamp.is_some() {
1939            let ts_pos = id_start + n_visited * 2;
1940            format!(
1941                "edge_type IN ({type_in}) AND valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1942                ts = numbered_placeholder(ts_pos),
1943            )
1944        } else {
1945            format!("edge_type IN ({type_in}) AND valid_to IS NULL")
1946        };
1947
1948        let edge_sql = format!(
1949            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1950                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1951                    edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight, confidence_fast, confidence_slow, turn_index
1952             FROM graph_edges
1953             WHERE {edge_filter}
1954               AND source_entity_id IN ({ph_ids1})
1955               AND target_entity_id IN ({ph_ids2})"
1956        );
1957        let mut edge_query = zeph_db::query_as::<_, EdgeRow>(&edge_sql);
1958        for t in type_strs {
1959            edge_query = edge_query.bind(*t);
1960        }
1961        for id in &visited_ids {
1962            edge_query = edge_query.bind(*id);
1963        }
1964        for id in &visited_ids {
1965            edge_query = edge_query.bind(*id);
1966        }
1967        if let Some(ts) = at_timestamp {
1968            edge_query = edge_query.bind(ts);
1969        }
1970        let edge_rows: Vec<EdgeRow> = edge_query.fetch_all(&self.pool).await?;
1971
1972        // For entity query, use plain sequential bind positions (no type prefix offset)
1973        let entity_sql2 = format!(
1974            "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
1975             FROM graph_entities WHERE id IN ({ph})",
1976            ph = placeholder_list(1, visited_ids.len()),
1977        );
1978        let mut entity_query = zeph_db::query_as::<_, EntityRow>(&entity_sql2);
1979        for id in &visited_ids {
1980            entity_query = entity_query.bind(*id);
1981        }
1982        let entity_rows: Vec<EntityRow> = entity_query.fetch_all(&self.pool).await?;
1983
1984        let entities: Vec<Entity> = entity_rows
1985            .into_iter()
1986            .map(entity_from_row)
1987            .collect::<Result<Vec<_>, _>>()?;
1988        let edges: Vec<Edge> = edge_rows.into_iter().map(edge_from_row).collect();
1989
1990        Ok((entities, edges, depth_map))
1991    }
1992
1993    /// Fetch entities and edges for a completed BFS depth map.
1994    async fn bfs_fetch_results(
1995        &self,
1996        depth_map: std::collections::HashMap<i64, u32>,
1997        at_timestamp: Option<&str>,
1998    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1999        let mut visited_ids: Vec<i64> = depth_map.keys().copied().collect();
2000        if visited_ids.is_empty() {
2001            return Ok((Vec::new(), Vec::new(), depth_map));
2002        }
2003        // Edge query binds visited_ids twice — cap at 499 to stay under SQLite 999 limit.
2004        if visited_ids.len() > 499 {
2005            tracing::warn!(
2006                total = visited_ids.len(),
2007                retained = 499,
2008                "bfs_fetch_results: visited entity set truncated to 499 to stay within SQLite bind limit; \
2009                 some reachable entities will be dropped from results"
2010            );
2011            visited_ids.truncate(499);
2012        }
2013
2014        let n = visited_ids.len();
2015        let ph_ids1 = placeholder_list(1, n);
2016        let ph_ids2 = placeholder_list(n + 1, n);
2017        let edge_filter = if at_timestamp.is_some() {
2018            let ts_pos = n * 2 + 1;
2019            format!(
2020                "valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
2021                ts = numbered_placeholder(ts_pos),
2022            )
2023        } else {
2024            "valid_to IS NULL".to_owned()
2025        };
2026        let edge_sql = format!(
2027            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
2028                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
2029                    edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight, confidence_fast, confidence_slow, turn_index
2030             FROM graph_edges
2031             WHERE {edge_filter}
2032               AND source_entity_id IN ({ph_ids1})
2033               AND target_entity_id IN ({ph_ids2})"
2034        );
2035        let mut edge_query = zeph_db::query_as::<_, EdgeRow>(&edge_sql);
2036        for id in &visited_ids {
2037            edge_query = edge_query.bind(*id);
2038        }
2039        for id in &visited_ids {
2040            edge_query = edge_query.bind(*id);
2041        }
2042        if let Some(ts) = at_timestamp {
2043            edge_query = edge_query.bind(ts);
2044        }
2045        let edge_rows: Vec<EdgeRow> = edge_query.fetch_all(&self.pool).await?;
2046
2047        let entity_sql = format!(
2048            "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
2049             FROM graph_entities WHERE id IN ({ph})",
2050            ph = placeholder_list(1, n),
2051        );
2052        let mut entity_query = zeph_db::query_as::<_, EntityRow>(&entity_sql);
2053        for id in &visited_ids {
2054            entity_query = entity_query.bind(*id);
2055        }
2056        let entity_rows: Vec<EntityRow> = entity_query.fetch_all(&self.pool).await?;
2057
2058        let entities: Vec<Entity> = entity_rows
2059            .into_iter()
2060            .map(entity_from_row)
2061            .collect::<Result<Vec<_>, _>>()?;
2062        let edges: Vec<Edge> = edge_rows.into_iter().map(edge_from_row).collect();
2063
2064        Ok((entities, edges, depth_map))
2065    }
2066
2067    // ── Backfill helpers ──────────────────────────────────────────────────────
2068
2069    /// Find an entity by name only (no type filter).
2070    ///
2071    /// Uses a two-phase lookup to ensure exact name matches are always prioritised:
2072    /// 1. Exact case-insensitive match on `name` or `canonical_name`.
2073    /// 2. If no exact match found, falls back to FTS5 prefix search (see `find_entities_fuzzy`).
2074    ///
2075    /// This prevents FTS5 from returning a different entity whose *summary* mentions the
2076    /// searched name (e.g. searching "Alice" returning "Google" because Google's summary
2077    /// contains "Alice").
2078    ///
2079    /// # Errors
2080    ///
2081    /// Returns an error if the database query fails.
2082    pub async fn find_entity_by_name(&self, name: &str) -> Result<Vec<Entity>, MemoryError> {
2083        let find_by_name_sql = format!(
2084            "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id \
2085             FROM graph_entities \
2086             WHERE name = ? {cn} OR canonical_name = ? {cn} \
2087             LIMIT 5",
2088            cn = <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
2089        );
2090        let rows: Vec<EntityRow> = zeph_db::query_as(&find_by_name_sql)
2091            .bind(name)
2092            .bind(name)
2093            .fetch_all(&self.pool)
2094            .await?;
2095
2096        if !rows.is_empty() {
2097            return rows.into_iter().map(entity_from_row).collect();
2098        }
2099
2100        self.find_entities_fuzzy(name, 5).await
2101    }
2102
2103    /// Return up to `limit` messages that have not yet been processed by graph extraction.
2104    ///
2105    /// Reads the `graph_processed` column added by migration 021.
2106    ///
2107    /// # Errors
2108    ///
2109    /// Returns an error if the database query fails.
2110    pub async fn unprocessed_messages_for_backfill(
2111        &self,
2112        limit: usize,
2113    ) -> Result<Vec<(crate::types::MessageId, String)>, MemoryError> {
2114        let limit = i64::try_from(limit)?;
2115        let rows: Vec<(i64, String)> = zeph_db::query_as(sql!(
2116            "SELECT id, content FROM messages
2117             WHERE graph_processed = 0
2118             ORDER BY id ASC
2119             LIMIT ?"
2120        ))
2121        .bind(limit)
2122        .fetch_all(&self.pool)
2123        .await?;
2124        Ok(rows
2125            .into_iter()
2126            .map(|(id, content)| (crate::types::MessageId(id), content))
2127            .collect())
2128    }
2129
2130    /// Return the count of messages not yet processed by graph extraction.
2131    ///
2132    /// # Errors
2133    ///
2134    /// Returns an error if the database query fails.
2135    pub async fn unprocessed_message_count(&self) -> Result<i64, MemoryError> {
2136        let count: i64 = zeph_db::query_scalar(sql!(
2137            "SELECT COUNT(*) FROM messages WHERE graph_processed = 0"
2138        ))
2139        .fetch_one(&self.pool)
2140        .await?;
2141        Ok(count)
2142    }
2143
2144    /// Mark a batch of messages as graph-processed.
2145    ///
2146    /// # Errors
2147    ///
2148    /// Returns an error if the database query fails.
2149    pub async fn mark_messages_graph_processed(
2150        &self,
2151        ids: &[crate::types::MessageId],
2152    ) -> Result<(), MemoryError> {
2153        const MAX_BATCH: usize = 490;
2154        if ids.is_empty() {
2155            return Ok(());
2156        }
2157        for chunk in ids.chunks(MAX_BATCH) {
2158            let placeholders = placeholder_list(1, chunk.len());
2159            let sql =
2160                format!("UPDATE messages SET graph_processed = 1 WHERE id IN ({placeholders})");
2161            let mut query = zeph_db::query(&sql);
2162            for id in chunk {
2163                query = query.bind(id.0);
2164            }
2165            query.execute(&self.pool).await?;
2166        }
2167        Ok(())
2168    }
2169}
2170
2171// ── Dialect helpers ───────────────────────────────────────────────────────────
2172
2173#[cfg(all(feature = "sqlite", not(feature = "postgres")))]
2174fn community_ids_sql(placeholders: &str) -> String {
2175    format!(
2176        "SELECT CAST(j.value AS INTEGER) AS entity_id, c.id AS community_id
2177         FROM graph_communities c, json_each(c.entity_ids) j
2178         WHERE CAST(j.value AS INTEGER) IN ({placeholders})"
2179    )
2180}
2181
2182#[cfg(feature = "postgres")]
2183fn community_ids_sql(placeholders: &str) -> String {
2184    format!(
2185        "SELECT (j.value)::bigint AS entity_id, c.id AS community_id
2186         FROM graph_communities c,
2187              jsonb_array_elements_text(c.entity_ids::jsonb) j(value)
2188         WHERE (j.value)::bigint IN ({placeholders})"
2189    )
2190}
2191
2192// ── Row types for zeph_db::query_as ─────────────────────────────────────────────
2193
2194#[derive(zeph_db::FromRow)]
2195struct EntityRow {
2196    id: i64,
2197    name: String,
2198    canonical_name: String,
2199    entity_type: String,
2200    summary: Option<String>,
2201    first_seen_at: String,
2202    last_seen_at: String,
2203    qdrant_point_id: Option<String>,
2204}
2205
2206fn entity_from_row(row: EntityRow) -> Result<Entity, MemoryError> {
2207    let entity_type = row
2208        .entity_type
2209        .parse::<EntityType>()
2210        .map_err(MemoryError::GraphStore)?;
2211    Ok(Entity {
2212        id: EntityId(row.id),
2213        name: row.name,
2214        canonical_name: row.canonical_name,
2215        entity_type,
2216        summary: row.summary,
2217        first_seen_at: row.first_seen_at,
2218        last_seen_at: row.last_seen_at,
2219        qdrant_point_id: row.qdrant_point_id,
2220    })
2221}
2222
2223#[derive(zeph_db::FromRow)]
2224struct AliasRow {
2225    id: i64,
2226    entity_id: i64,
2227    alias_name: String,
2228    created_at: String,
2229}
2230
2231fn alias_from_row(row: AliasRow) -> EntityAlias {
2232    EntityAlias {
2233        id: row.id,
2234        entity_id: EntityId(row.entity_id),
2235        alias_name: row.alias_name,
2236        created_at: row.created_at,
2237    }
2238}
2239
2240#[derive(zeph_db::FromRow)]
2241struct EdgeRow {
2242    id: i64,
2243    source_entity_id: i64,
2244    target_entity_id: i64,
2245    relation: String,
2246    fact: String,
2247    confidence: f64,
2248    valid_from: String,
2249    valid_to: Option<String>,
2250    created_at: String,
2251    expired_at: Option<String>,
2252    #[sqlx(rename = "episode_id")]
2253    source_message_id: Option<i64>,
2254    qdrant_point_id: Option<String>,
2255    edge_type: String,
2256    retrieval_count: i32,
2257    last_retrieved_at: Option<i64>,
2258    superseded_by: Option<i64>,
2259    canonical_relation: Option<String>,
2260    supersedes: Option<i64>,
2261    // Hebbian reinforcement weight (HL-F1, #3344). SQLite REAL maps to f64 via sqlx.
2262    weight: f64,
2263    // SYNAPSE multi-timescale variables (#3709).
2264    confidence_fast: f64,
2265    confidence_slow: f64,
2266    turn_index: Option<i64>,
2267}
2268
2269fn edge_from_row(row: EdgeRow) -> Edge {
2270    let edge_type = row
2271        .edge_type
2272        .parse::<EdgeType>()
2273        .unwrap_or(EdgeType::Semantic);
2274    let canonical_relation = row
2275        .canonical_relation
2276        .unwrap_or_else(|| row.relation.clone());
2277    Edge {
2278        id: row.id,
2279        source_entity_id: row.source_entity_id,
2280        target_entity_id: row.target_entity_id,
2281        canonical_relation,
2282        relation: row.relation,
2283        fact: row.fact,
2284        #[allow(clippy::cast_possible_truncation)]
2285        confidence: row.confidence as f32,
2286        valid_from: row.valid_from,
2287        valid_to: row.valid_to,
2288        created_at: row.created_at,
2289        expired_at: row.expired_at,
2290        source_message_id: row.source_message_id.map(MessageId),
2291        qdrant_point_id: row.qdrant_point_id,
2292        edge_type,
2293        retrieval_count: row.retrieval_count,
2294        last_retrieved_at: row.last_retrieved_at,
2295        superseded_by: row.superseded_by,
2296        supersedes: row.supersedes,
2297        #[allow(clippy::cast_possible_truncation)]
2298        weight: row.weight as f32,
2299        #[allow(clippy::cast_possible_truncation)]
2300        confidence_fast: row.confidence_fast as f32,
2301        #[allow(clippy::cast_possible_truncation)]
2302        confidence_slow: row.confidence_slow as f32,
2303        #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
2304        turn_index: row.turn_index.map(|v| v as u32),
2305    }
2306}
2307
2308#[derive(zeph_db::FromRow)]
2309struct CommunityRow {
2310    id: i64,
2311    name: String,
2312    summary: String,
2313    entity_ids: String,
2314    fingerprint: Option<String>,
2315    created_at: String,
2316    updated_at: String,
2317}
2318
2319// ── GAAMA Episode methods ──────────────────────────────────────────────────────
2320
2321impl GraphStore {
2322    /// Ensure a GAAMA episode exists for this conversation, returning its ID.
2323    ///
2324    /// Idempotent: inserts on first call, returns existing ID on subsequent calls.
2325    ///
2326    /// # Errors
2327    ///
2328    /// Returns an error if the database query fails.
2329    pub async fn ensure_episode(&self, conversation_id: i64) -> Result<i64, MemoryError> {
2330        // Ensure the conversation row exists before inserting into graph_episodes,
2331        // which has a FK referencing conversations(id). On a fresh database the agent
2332        // may run graph extraction before the conversation row is committed.
2333        zeph_db::query(sql!(
2334            "INSERT INTO conversations (id) VALUES (?)
2335             ON CONFLICT (id) DO NOTHING"
2336        ))
2337        .bind(conversation_id)
2338        .execute(&self.pool)
2339        .await?;
2340
2341        let id: i64 = zeph_db::query_scalar(sql!(
2342            "INSERT INTO graph_episodes (conversation_id)
2343             VALUES (?)
2344             ON CONFLICT(conversation_id) DO UPDATE SET conversation_id = excluded.conversation_id
2345             RETURNING id"
2346        ))
2347        .bind(conversation_id)
2348        .fetch_one(&self.pool)
2349        .await?;
2350        Ok(id)
2351    }
2352
2353    /// Record that an entity was observed in an episode.
2354    ///
2355    /// Idempotent: does nothing if the link already exists.
2356    ///
2357    /// # Errors
2358    ///
2359    /// Returns an error if the database query fails.
2360    pub async fn link_entity_to_episode(
2361        &self,
2362        episode_id: i64,
2363        entity_id: i64,
2364    ) -> Result<(), MemoryError> {
2365        zeph_db::query(sql!(
2366            "INSERT INTO graph_episode_entities (episode_id, entity_id)
2367             VALUES (?, ?)
2368             ON CONFLICT (episode_id, entity_id) DO NOTHING"
2369        ))
2370        .bind(episode_id)
2371        .bind(entity_id)
2372        .execute(&self.pool)
2373        .await?;
2374        Ok(())
2375    }
2376
2377    /// Return all episodes in which an entity appears.
2378    ///
2379    /// # Errors
2380    ///
2381    /// Returns an error if the database query fails.
2382    pub async fn episodes_for_entity(
2383        &self,
2384        entity_id: i64,
2385    ) -> Result<Vec<super::types::Episode>, MemoryError> {
2386        #[derive(zeph_db::FromRow)]
2387        struct EpisodeRow {
2388            id: i64,
2389            conversation_id: i64,
2390            created_at: String,
2391            closed_at: Option<String>,
2392        }
2393        let rows: Vec<EpisodeRow> = zeph_db::query_as(sql!(
2394            "SELECT e.id, e.conversation_id, e.created_at, e.closed_at
2395             FROM graph_episodes e
2396             JOIN graph_episode_entities ee ON ee.episode_id = e.id
2397             WHERE ee.entity_id = ?"
2398        ))
2399        .bind(entity_id)
2400        .fetch_all(&self.pool)
2401        .await?;
2402        Ok(rows
2403            .into_iter()
2404            .map(|r| super::types::Episode {
2405                id: r.id,
2406                conversation_id: r.conversation_id,
2407                created_at: r.created_at,
2408                closed_at: r.closed_at,
2409            })
2410            .collect())
2411    }
2412
2413    /// Insert a new edge using APEX-MEM append-only supersede semantics (FR-001).
2414    ///
2415    /// If a byte-identical active edge exists for `(src, tgt, canonical_relation, edge_type)`,
2416    /// records a reassertion in `edge_reassertions` and returns the existing edge id (FR-015).
2417    ///
2418    /// If a different active edge exists for the same `(src, canonical_relation, edge_type)` key,
2419    /// invalidates it with a supersession pointer, inserts the new edge, and sets `supersedes` on
2420    /// the new row to link the chain. Checks that the resulting chain depth would not exceed
2421    /// [`SUPERSEDE_DEPTH_CAP`] and that no cycle would be introduced before committing.
2422    ///
2423    /// Optionally increments [`ApexMetrics::supersedes_total`] when `metrics` is provided and a
2424    /// supersession actually occurs.
2425    ///
2426    /// # Errors
2427    ///
2428    /// - [`MemoryError::SupersedeCycle`] — the new edge would create a supersede cycle.
2429    /// - [`MemoryError::SupersedeDepthExceeded`] — chain depth cap would be exceeded.
2430    /// - [`MemoryError::Sqlx`] / [`MemoryError::Db`] — database errors.
2431    #[allow(clippy::too_many_arguments)]
2432    // TODO(B3): refactor into a builder or config struct to reduce argument count
2433    // function with many required inputs; a *Params struct would be more verbose without simplifying the call site
2434    #[tracing::instrument(name = "memory.graph.insert_or_supersede", skip_all)]
2435    pub async fn insert_or_supersede_with_metrics(
2436        &self,
2437        source_entity_id: i64,
2438        target_entity_id: i64,
2439        relation: &str,
2440        canonical_relation: &str,
2441        fact: &str,
2442        confidence: f32,
2443        episode_id: Option<MessageId>,
2444        edge_type: EdgeType,
2445        set_supersedes: bool,
2446        metrics: Option<&ApexMetrics>,
2447    ) -> Result<i64, MemoryError> {
2448        self.insert_or_supersede_with_turn_index_and_metrics(
2449            source_entity_id,
2450            target_entity_id,
2451            relation,
2452            canonical_relation,
2453            fact,
2454            confidence,
2455            episode_id,
2456            edge_type,
2457            set_supersedes,
2458            metrics,
2459            None,
2460        )
2461        .await
2462    }
2463
2464    /// Insert or supersede an edge, recording turn-level provenance (#3710).
2465    ///
2466    /// `turn_index` is stored in the new edge row when a supersession chain is started.
2467    /// Pass `None` to omit provenance (equivalent to [`Self::insert_or_supersede_with_metrics`]).
2468    ///
2469    /// # Errors
2470    ///
2471    /// Returns an error if the database transaction fails.
2472    #[allow(clippy::too_many_arguments)]
2473    pub async fn insert_or_supersede_with_turn_index_and_metrics(
2474        &self,
2475        source_entity_id: i64,
2476        target_entity_id: i64,
2477        relation: &str,
2478        canonical_relation: &str,
2479        fact: &str,
2480        confidence: f32,
2481        episode_id: Option<MessageId>,
2482        edge_type: EdgeType,
2483        set_supersedes: bool,
2484        metrics: Option<&ApexMetrics>,
2485        turn_index: Option<u32>,
2486    ) -> Result<i64, MemoryError> {
2487        if source_entity_id == target_entity_id {
2488            return Err(MemoryError::InvalidInput(format!(
2489                "self-loop edge rejected: source and target are the same entity (id={source_entity_id})"
2490            )));
2491        }
2492        let confidence = confidence.clamp(0.0, 1.0);
2493        let edge_type_str = edge_type.as_str();
2494        let episode_raw: Option<i64> = episode_id.map(|m| m.0);
2495
2496        let mut tx = zeph_db::begin(&self.pool).await?;
2497
2498        if let Some(existing_id) = find_identical_active_edge(
2499            &mut tx,
2500            source_entity_id,
2501            target_entity_id,
2502            canonical_relation,
2503            edge_type_str,
2504            fact,
2505        )
2506        .await?
2507        {
2508            record_reassertion(
2509                &mut tx,
2510                existing_id,
2511                episode_raw,
2512                confidence,
2513                self.benna_fast_rate,
2514                self.benna_slow_rate,
2515            )
2516            .await?;
2517            tx.commit().await?;
2518            return Ok(existing_id);
2519        }
2520
2521        let prior_head =
2522            find_prior_active_head(&mut tx, source_entity_id, canonical_relation, edge_type_str)
2523                .await?;
2524
2525        // Cycle guard: depth cap also bounds cycles. Run inside the transaction using sqlx
2526        // directly to avoid a second pool acquire (SQLite pool is typically size 1 in tests).
2527        if let Some(head_id) = prior_head {
2528            check_supersede_depth_in_tx(&mut tx, head_id).await?;
2529        }
2530
2531        // Expire the prior head BEFORE inserting the new row so that the partial unique index
2532        // uq_graph_edges_active_head is not violated. SQLite enforces unique indexes at statement
2533        // level, not at transaction commit, so the deactivation must precede the INSERT.
2534        if let Some(head_id) = prior_head {
2535            expire_prior_head(&mut tx, head_id).await?;
2536        }
2537
2538        let supersedes_val: Option<i64> = if set_supersedes { prior_head } else { None };
2539        let turn_index_raw: Option<i64> = turn_index.map(i64::from);
2540        let new_id = insert_new_edge(
2541            &mut tx,
2542            source_entity_id,
2543            target_entity_id,
2544            relation,
2545            canonical_relation,
2546            fact,
2547            confidence,
2548            episode_raw,
2549            edge_type_str,
2550            supersedes_val,
2551            turn_index_raw,
2552        )
2553        .await?;
2554
2555        if let Some(head_id) = prior_head {
2556            set_superseded_by(&mut tx, head_id, new_id).await?;
2557            if let Some(m) = metrics {
2558                m.supersedes_total
2559                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2560            }
2561        }
2562
2563        tx.commit().await?;
2564        Ok(new_id)
2565    }
2566
2567    /// Convenience wrapper: calls [`Self::insert_or_supersede_with_metrics`] with `metrics = None`.
2568    ///
2569    /// # Errors
2570    ///
2571    /// Propagates errors from [`Self::insert_or_supersede_with_metrics`].
2572    #[allow(clippy::too_many_arguments)] // function with many required inputs; a *Params struct would be more verbose without simplifying the call site
2573    pub async fn insert_or_supersede(
2574        &self,
2575        source_entity_id: i64,
2576        target_entity_id: i64,
2577        relation: &str,
2578        canonical_relation: &str,
2579        fact: &str,
2580        confidence: f32,
2581        episode_id: Option<MessageId>,
2582        edge_type: EdgeType,
2583        set_supersedes: bool,
2584    ) -> Result<i64, MemoryError> {
2585        self.insert_or_supersede_with_metrics(
2586            source_entity_id,
2587            target_entity_id,
2588            relation,
2589            canonical_relation,
2590            fact,
2591            confidence,
2592            episode_id,
2593            edge_type,
2594            set_supersedes,
2595            None,
2596        )
2597        .await
2598    }
2599
2600    /// Insert or supersede an edge, then run implicit conflict detection if a detector is provided.
2601    ///
2602    /// Calls [`Self::insert_or_supersede_with_metrics`] first, then — when `detector` is `Some` and
2603    /// the ontology confirms the predicate is cardinality-1 — queries active edges on the same source
2604    /// entity, detects conflict candidates, and stages them in `implicit_conflict_candidates`.
2605    ///
2606    /// Conflict detection failures are logged and swallowed so they never block the write path.
2607    ///
2608    /// # Errors
2609    ///
2610    /// Propagates errors from [`Self::insert_or_supersede_with_metrics`].
2611    #[allow(clippy::too_many_arguments)] // function with many required inputs; a *Params struct would be more verbose without simplifying the call site
2612    pub async fn insert_or_supersede_with_conflict_detection(
2613        &self,
2614        source_entity_id: i64,
2615        target_entity_id: i64,
2616        relation: &str,
2617        canonical_relation: &str,
2618        fact: &str,
2619        confidence: f32,
2620        episode_id: Option<MessageId>,
2621        edge_type: EdgeType,
2622        set_supersedes: bool,
2623        metrics: Option<&ApexMetrics>,
2624        detector: Option<&crate::graph::implicit_conflict::ImplicitConflictDetector>,
2625        ontology: Option<&crate::graph::ontology::OntologyTable>,
2626    ) -> Result<i64, MemoryError> {
2627        let new_id = self
2628            .insert_or_supersede_with_metrics(
2629                source_entity_id,
2630                target_entity_id,
2631                relation,
2632                canonical_relation,
2633                fact,
2634                confidence,
2635                episode_id,
2636                edge_type,
2637                set_supersedes,
2638                metrics,
2639            )
2640            .await?;
2641
2642        if let (Some(det), Some(onto)) = (detector, ontology)
2643            && det.is_enabled()
2644        {
2645            let is_cardinality_n =
2646                onto.cardinality(canonical_relation) == crate::graph::ontology::Cardinality::Many;
2647
2648            // Query existing active edges for the same source entity (excluding the new one).
2649            let existing_raw = sqlx::query(
2650                "SELECT id, canonical_relation FROM graph_edges
2651                     WHERE source_entity_id = ?
2652                       AND id != ?
2653                       AND expired_at IS NULL",
2654            )
2655            .bind(source_entity_id)
2656            .bind(new_id)
2657            .fetch_all(&self.pool)
2658            .await;
2659
2660            match existing_raw {
2661                Ok(rows) => {
2662                    use sqlx::Row as _;
2663                    let existing: Vec<(i64, String)> = rows
2664                        .into_iter()
2665                        .map(|r| {
2666                            let id: i64 = r.try_get("id").unwrap_or(0);
2667                            let rel: String = r.try_get("canonical_relation").unwrap_or_default();
2668                            (id, rel)
2669                        })
2670                        .collect();
2671                    let existing_refs: Vec<(i64, &str)> = existing
2672                        .iter()
2673                        .map(|(id, rel)| (*id, rel.as_str()))
2674                        .collect();
2675
2676                    let candidates = det.detect_candidates(
2677                        new_id,
2678                        canonical_relation,
2679                        &existing_refs,
2680                        is_cardinality_n,
2681                    );
2682
2683                    if !candidates.is_empty() {
2684                        let ttl = det.candidate_ttl_days();
2685                        match zeph_db::begin(&self.pool).await {
2686                            Ok(mut tx) => {
2687                                match det.stage_candidates(&candidates, &mut tx, ttl).await {
2688                                    Ok(()) => {
2689                                        if let Err(e) = tx.commit().await {
2690                                            tracing::warn!(
2691                                                error = %e,
2692                                                "implicit conflict tx commit failed (non-fatal)"
2693                                            );
2694                                        }
2695                                    }
2696                                    Err(e) => {
2697                                        tracing::warn!(
2698                                            error = %e,
2699                                            "implicit conflict staging failed (non-fatal)"
2700                                        );
2701                                    }
2702                                }
2703                            }
2704                            Err(e) => {
2705                                tracing::warn!(
2706                                    error = %e,
2707                                    "implicit conflict: tx begin failed (non-fatal)"
2708                                );
2709                            }
2710                        }
2711                    }
2712                }
2713                Err(e) => {
2714                    tracing::warn!(
2715                        error = %e,
2716                        "implicit conflict: failed to query existing edges (non-fatal)"
2717                    );
2718                }
2719            }
2720        }
2721
2722        Ok(new_id)
2723    }
2724
2725    /// Walk the `supersedes` chain from `head_id` using a single recursive CTE and return its depth.
2726    ///
2727    /// Returns `0` when the edge has no `supersedes` pointer (it is the root).
2728    /// The CTE is capped at `SUPERSEDE_DEPTH_CAP + 1` to prevent unbounded recursion.
2729    ///
2730    /// # Errors
2731    ///
2732    /// Returns [`MemoryError::SupersedeCycle`] when the CTE detects a cycle (depth exceeds cap
2733    /// but the chain has not terminated), or a database error on query failure.
2734    pub async fn check_supersede_depth(&self, head_id: i64) -> Result<usize, MemoryError> {
2735        Self::check_supersede_depth_with_pool(&self.pool, head_id).await
2736    }
2737
2738    async fn check_supersede_depth_with_pool(
2739        pool: &zeph_db::DbPool,
2740        head_id: i64,
2741    ) -> Result<usize, MemoryError> {
2742        let cap = i64::try_from(SUPERSEDE_DEPTH_CAP + 1).unwrap_or(i64::MAX);
2743        // CTE walks the supersedes chain starting at head_id (depth=0).
2744        // Each hop to a superseded ancestor increments depth. NULL supersedes terminates the walk.
2745        let depth: Option<i64> = zeph_db::query_scalar(sql!(
2746            "WITH RECURSIVE chain(id, depth) AS (
2747               SELECT id, 0 FROM graph_edges WHERE id = ?
2748               UNION ALL
2749               SELECT e.supersedes, c.depth + 1
2750               FROM graph_edges e JOIN chain c ON e.id = c.id
2751               WHERE e.supersedes IS NOT NULL AND c.depth < ?
2752             )
2753             SELECT MAX(depth) FROM chain"
2754        ))
2755        .bind(head_id)
2756        .bind(cap)
2757        .fetch_optional(pool)
2758        .await?
2759        .flatten();
2760
2761        #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
2762        let d = depth.unwrap_or(0) as usize;
2763        if d > SUPERSEDE_DEPTH_CAP {
2764            return Err(MemoryError::SupersedeCycle(head_id));
2765        }
2766        Ok(d)
2767    }
2768
2769    // ── MemCoT provenance helpers (issue #3574 / #3575) ───────────────────────
2770
2771    /// Fetch `(edge_id, source_message_id)` pairs for a batch of edge ids.
2772    ///
2773    /// Returns only rows where `source_message_id` (`episode_id` column) is not NULL.
2774    /// Called from [`crate::semantic::SemanticMemory::recall_graph_view`] for Zoom-In provenance.
2775    ///
2776    /// # Errors
2777    ///
2778    /// Returns [`crate::error::MemoryError`] if the SQL query fails.
2779    pub async fn source_message_ids_for_edges(
2780        &self,
2781        edge_ids: &[i64],
2782    ) -> Result<Vec<(i64, crate::types::MessageId)>, crate::error::MemoryError> {
2783        if edge_ids.is_empty() {
2784            return Ok(Vec::new());
2785        }
2786        let placeholders = placeholder_list(1, edge_ids.len());
2787        let sql = format!(
2788            "SELECT id, episode_id FROM graph_edges \
2789             WHERE id IN ({placeholders}) AND episode_id IS NOT NULL"
2790        );
2791        let mut q = zeph_db::query_as::<_, (i64, crate::types::MessageId)>(&sql);
2792        for &eid in edge_ids {
2793            q = q.bind(eid);
2794        }
2795        let rows = q.fetch_all(&self.pool).await?;
2796        Ok(rows)
2797    }
2798
2799    /// Return the `source_entity_id` for a given edge id, or `None` if not found.
2800    ///
2801    /// Used by `recall_graph_view` with `ZoomOut` to seed the 1-hop BFS.
2802    ///
2803    /// # Errors
2804    ///
2805    /// Returns [`crate::error::MemoryError`] if the SQL query fails.
2806    pub async fn source_entity_id_for_edge(
2807        &self,
2808        edge_id: i64,
2809    ) -> Result<Option<i64>, crate::error::MemoryError> {
2810        let row: Option<i64> = zeph_db::query_scalar(sql!(
2811            "SELECT source_entity_id FROM graph_edges WHERE id = ?1 AND valid_to IS NULL LIMIT 1"
2812        ))
2813        .bind(edge_id)
2814        .fetch_optional(&self.pool)
2815        .await?;
2816        Ok(row)
2817    }
2818
2819    /// Fetch all active edges exactly 1 hop away from `entity_id`, filtered by `edge_types`.
2820    ///
2821    /// Returns a flat list of [`crate::recall_view::RecalledFact`] wrapping `GraphFact` values
2822    /// with `hop_distance = 1`. Used by `recall_graph_view` for `ZoomOut` neighbor expansion.
2823    ///
2824    /// # Errors
2825    ///
2826    /// Returns [`crate::error::MemoryError`] if the SQL query or entity name lookup fails.
2827    pub async fn bfs_edges_at_depth(
2828        &self,
2829        entity_id: i64,
2830        _depth: u32,
2831        edge_types: &[crate::graph::types::EdgeType],
2832    ) -> Result<Vec<crate::recall_view::RecalledFact>, crate::error::MemoryError> {
2833        let type_strs: Vec<&str> = if edge_types.is_empty() {
2834            vec!["semantic", "temporal", "causal", "entity"]
2835        } else {
2836            edge_types.iter().map(|et| et.as_str()).collect()
2837        };
2838
2839        let ph = placeholder_list(1, type_strs.len());
2840        let src_pos = type_strs.len() + 1;
2841        let tgt_pos = src_pos + 1;
2842        let src_ph = numbered_placeholder(src_pos);
2843        let tgt_ph = numbered_placeholder(tgt_pos);
2844
2845        let sql = format!(
2846            "SELECT ge.id, ge.source_entity_id, ge.target_entity_id, ge.relation, ge.fact,
2847                    ge.confidence, ge.valid_from, ge.valid_to, ge.created_at, ge.expired_at,
2848                    ge.episode_id, ge.qdrant_point_id, ge.edge_type, ge.retrieval_count,
2849                    ge.last_retrieved_at, ge.superseded_by, ge.canonical_relation, ge.supersedes,
2850                    ge.weight, ge.confidence_fast, ge.confidence_slow, ge.turn_index
2851             FROM graph_edges ge
2852             WHERE ge.edge_type IN ({ph})
2853               AND ge.valid_to IS NULL
2854               AND (ge.source_entity_id = {src_ph} OR ge.target_entity_id = {tgt_ph})
2855             LIMIT 200"
2856        );
2857
2858        let mut q = zeph_db::query_as::<_, EdgeRow>(&sql);
2859        for t in &type_strs {
2860            q = q.bind(*t);
2861        }
2862        q = q.bind(entity_id).bind(entity_id);
2863
2864        let rows = q.fetch_all(&self.pool).await?;
2865
2866        // Resolve entity names from ids.
2867        let all_ids: Vec<i64> = rows
2868            .iter()
2869            .flat_map(|r| [r.source_entity_id, r.target_entity_id])
2870            .collect::<std::collections::HashSet<_>>()
2871            .into_iter()
2872            .collect();
2873
2874        let mut name_map: std::collections::HashMap<i64, String> = std::collections::HashMap::new();
2875        for chunk in all_ids.chunks(490) {
2876            let ph2 = placeholder_list(1, chunk.len());
2877            let name_sql =
2878                format!("SELECT id, canonical_name FROM graph_entities WHERE id IN ({ph2})");
2879            let mut nq = zeph_db::query_as::<_, (i64, String)>(&name_sql);
2880            for &id in chunk {
2881                nq = nq.bind(id);
2882            }
2883            let name_rows = nq.fetch_all(&self.pool).await?;
2884            for (id, name) in name_rows {
2885                name_map.insert(id, name);
2886            }
2887        }
2888
2889        let facts: Vec<crate::recall_view::RecalledFact> = rows
2890            .into_iter()
2891            .filter_map(|row| {
2892                let edge = edge_from_row(row);
2893                let entity_name = name_map.get(&edge.source_entity_id).cloned()?;
2894                let target_name = name_map.get(&edge.target_entity_id).cloned()?;
2895                if entity_name.is_empty() || target_name.is_empty() {
2896                    return None;
2897                }
2898                let fact = crate::graph::types::GraphFact {
2899                    entity_name,
2900                    relation: edge.canonical_relation.clone(),
2901                    target_name,
2902                    fact: edge.fact.clone(),
2903                    entity_match_score: 0.5,
2904                    hop_distance: 1,
2905                    confidence: edge.confidence,
2906                    valid_from: if edge.valid_from.is_empty() {
2907                        None
2908                    } else {
2909                        Some(edge.valid_from.clone())
2910                    },
2911                    edge_type: edge.edge_type,
2912                    retrieval_count: edge.retrieval_count,
2913                    edge_id: Some(edge.id),
2914                };
2915                Some(crate::recall_view::RecalledFact::from_graph_fact(fact))
2916            })
2917            .collect();
2918
2919        Ok(facts)
2920    }
2921}
2922
2923// ── insert_or_supersede helpers ───────────────────────────────────────────────
2924// All helpers take `&mut zeph_db::DbTransaction` so they run inside the caller's transaction
2925// without acquiring a second connection (SQLite pool is typically size 1 in tests).
2926
2927type Tx<'a> = zeph_db::DbTransaction<'a>;
2928
2929/// Look up a byte-identical active edge (FR-015 reassertion path).
2930///
2931/// Returns the existing edge ID when all columns match exactly, or `None` otherwise.
2932async fn find_identical_active_edge(
2933    tx: &mut Tx<'_>,
2934    src: i64,
2935    tgt: i64,
2936    canon: &str,
2937    edge_type_str: &str,
2938    fact: &str,
2939) -> Result<Option<i64>, MemoryError> {
2940    Ok(zeph_db::query_scalar(sql!(
2941        "SELECT id FROM graph_edges
2942         WHERE source_entity_id = ?
2943           AND target_entity_id = ?
2944           AND canonical_relation = ?
2945           AND edge_type = ?
2946           AND fact = ?
2947           AND valid_to IS NULL
2948           AND expired_at IS NULL
2949         LIMIT 1"
2950    ))
2951    .bind(src)
2952    .bind(tgt)
2953    .bind(canon)
2954    .bind(edge_type_str)
2955    .bind(fact)
2956    .fetch_optional(&mut **tx)
2957    .await?)
2958}
2959
2960/// Record a reassertion event for an existing edge (FR-015) and apply the Benna-Fusi
2961/// multi-timescale update to `confidence_fast`/`confidence_slow` in `graph_edges` (#3709).
2962///
2963/// This ensures SYNAPSE synaptic variables are updated on every reassertion, including the
2964/// APEX append-only path where identical edges are not superseded but re-confirmed.
2965async fn record_reassertion(
2966    tx: &mut Tx<'_>,
2967    head_id: i64,
2968    episode_raw: Option<i64>,
2969    confidence: f32,
2970    benna_fast_rate: f32,
2971    benna_slow_rate: f32,
2972) -> Result<(), MemoryError> {
2973    #[allow(clippy::cast_possible_wrap)]
2974    let asserted_at = std::time::SystemTime::now()
2975        .duration_since(std::time::UNIX_EPOCH)
2976        .unwrap_or_default()
2977        .as_secs() as i64;
2978    zeph_db::query(sql!(
2979        "INSERT INTO edge_reassertions (head_edge_id, asserted_at, episode_id, confidence)
2980         VALUES (?, ?, ?, ?)"
2981    ))
2982    .bind(head_id)
2983    .bind(asserted_at)
2984    .bind(episode_raw)
2985    .bind(f64::from(confidence))
2986    .execute(&mut **tx)
2987    .await?;
2988
2989    // Apply Benna-Fusi update to the head edge's synaptic variables.
2990    let existing: Option<(f64, f64)> = zeph_db::query_as(sql!(
2991        "SELECT confidence_fast, confidence_slow FROM graph_edges WHERE id = ? LIMIT 1"
2992    ))
2993    .bind(head_id)
2994    .fetch_optional(&mut **tx)
2995    .await?;
2996
2997    if let Some((stored_fast, stored_slow)) = existing {
2998        #[allow(clippy::cast_possible_truncation)]
2999        let stored_fast_f32 = (stored_fast as f32).clamp(0.0, 1.0);
3000        #[allow(clippy::cast_possible_truncation)]
3001        let stored_slow_f32 = (stored_slow as f32).clamp(0.0, 1.0);
3002        let new_fast = stored_fast_f32 + benna_fast_rate * (confidence - stored_fast_f32);
3003        let new_slow = stored_slow_f32 + benna_slow_rate * (new_fast - stored_slow_f32);
3004        zeph_db::query(sql!(
3005            "UPDATE graph_edges SET confidence_fast = ?, confidence_slow = ? WHERE id = ?"
3006        ))
3007        .bind(f64::from(new_fast))
3008        .bind(f64::from(new_slow))
3009        .bind(head_id)
3010        .execute(&mut **tx)
3011        .await?;
3012    }
3013
3014    Ok(())
3015}
3016
3017/// Find the current active head edge for `(src, canonical_relation, edge_type)`.
3018async fn find_prior_active_head(
3019    tx: &mut Tx<'_>,
3020    src: i64,
3021    canon: &str,
3022    edge_type_str: &str,
3023) -> Result<Option<i64>, MemoryError> {
3024    Ok(zeph_db::query_scalar(sql!(
3025        "SELECT id FROM graph_edges
3026         WHERE source_entity_id = ?
3027           AND canonical_relation = ?
3028           AND edge_type = ?
3029           AND valid_to IS NULL
3030           AND expired_at IS NULL
3031         ORDER BY created_at DESC
3032         LIMIT 1"
3033    ))
3034    .bind(src)
3035    .bind(canon)
3036    .bind(edge_type_str)
3037    .fetch_optional(&mut **tx)
3038    .await?)
3039}
3040
3041/// Verify the supersede chain depth for `head_id` does not exceed [`SUPERSEDE_DEPTH_CAP`].
3042///
3043/// Returns `Err(MemoryError::SupersedeDepthExceeded)` when the cap would be exceeded.
3044async fn check_supersede_depth_in_tx(tx: &mut Tx<'_>, head_id: i64) -> Result<(), MemoryError> {
3045    let cap = i64::try_from(SUPERSEDE_DEPTH_CAP + 1).unwrap_or(i64::MAX);
3046    let depth: Option<i64> = sqlx::query_scalar(
3047        "WITH RECURSIVE chain(id, depth) AS (
3048           SELECT supersedes, 1 FROM graph_edges WHERE id = ? AND supersedes IS NOT NULL
3049           UNION ALL
3050           SELECT e.supersedes, c.depth + 1
3051           FROM graph_edges e JOIN chain c ON e.id = c.id
3052           WHERE e.supersedes IS NOT NULL AND c.depth < ?
3053         )
3054         SELECT MAX(depth) FROM chain",
3055    )
3056    .bind(head_id)
3057    .bind(cap)
3058    .fetch_optional(&mut **tx)
3059    .await?
3060    .flatten();
3061    let d = usize::try_from(depth.unwrap_or(0)).unwrap_or(usize::MAX);
3062    if d > SUPERSEDE_DEPTH_CAP {
3063        return Err(MemoryError::SupersedeDepthExceeded(head_id));
3064    }
3065    Ok(())
3066}
3067
3068/// Insert a new edge row and return its generated ID.
3069#[allow(clippy::too_many_arguments)]
3070async fn insert_new_edge(
3071    tx: &mut Tx<'_>,
3072    src: i64,
3073    tgt: i64,
3074    relation: &str,
3075    canonical_relation: &str,
3076    fact: &str,
3077    confidence: f32,
3078    episode_raw: Option<i64>,
3079    edge_type_str: &str,
3080    supersedes_val: Option<i64>,
3081    turn_index: Option<i64>,
3082) -> Result<i64, MemoryError> {
3083    Ok(zeph_db::query_scalar(sql!(
3084        "INSERT INTO graph_edges
3085         (source_entity_id, target_entity_id, relation, canonical_relation, fact,
3086          confidence, confidence_fast, confidence_slow, episode_id, edge_type, supersedes, turn_index)
3087         VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
3088         RETURNING id"
3089    ))
3090    .bind(src)
3091    .bind(tgt)
3092    .bind(relation)
3093    .bind(canonical_relation)
3094    .bind(fact)
3095    .bind(f64::from(confidence))
3096    .bind(f64::from(confidence))
3097    .bind(f64::from(confidence))
3098    .bind(episode_raw)
3099    .bind(edge_type_str)
3100    .bind(supersedes_val)
3101    .bind(turn_index)
3102    .fetch_one(&mut **tx)
3103    .await?)
3104}
3105
3106/// Deactivate the prior head by setting `valid_to` and `expired_at`, leaving `superseded_by`
3107/// unset. Must be called **before** inserting the new row to avoid violating
3108/// `uq_graph_edges_active_head` — the unique index is enforced at statement level, not at commit.
3109async fn expire_prior_head(tx: &mut Tx<'_>, head_id: i64) -> Result<(), MemoryError> {
3110    zeph_db::query(sql!(
3111        "UPDATE graph_edges
3112         SET valid_to = CURRENT_TIMESTAMP,
3113             expired_at = CURRENT_TIMESTAMP
3114         WHERE id = ?"
3115    ))
3116    .bind(head_id)
3117    .execute(&mut **tx)
3118    .await?;
3119    Ok(())
3120}
3121
3122/// Back-fill `superseded_by` on the already-expired prior head after the new row has been
3123/// inserted and its ID is known.
3124async fn set_superseded_by(tx: &mut Tx<'_>, head_id: i64, new_id: i64) -> Result<(), MemoryError> {
3125    zeph_db::query(sql!(
3126        "UPDATE graph_edges SET superseded_by = ? WHERE id = ?"
3127    ))
3128    .bind(new_id)
3129    .bind(head_id)
3130    .execute(&mut **tx)
3131    .await?;
3132    Ok(())
3133}
3134
3135// ── FTS helpers ───────────────────────────────────────────────────────────────
3136
3137/// Row type for the UNION ALL FTS5 query in `find_entities_ranked`.
3138type EntityFtsRow = (
3139    i64,
3140    String,
3141    String,
3142    String,
3143    Option<String>,
3144    String,
3145    String,
3146    Option<String>,
3147    f64,
3148);
3149
3150/// Sanitize and tokenize `query` into an FTS5 query string.
3151///
3152/// Returns `None` when the input is empty or contains only FTS5 operator tokens,
3153/// indicating no search should be attempted.
3154fn build_fts_query(query: &str) -> Option<String> {
3155    const FTS5_OPERATORS: &[&str] = &["AND", "OR", "NOT", "NEAR"];
3156    let sanitized = sanitize_fts_query(query);
3157    if sanitized.is_empty() {
3158        return None;
3159    }
3160    let fts_query: String = sanitized
3161        .split_whitespace()
3162        .filter(|t| !FTS5_OPERATORS.contains(t))
3163        .map(|t| format!("{t}*"))
3164        .collect::<Vec<_>>()
3165        .join(" ");
3166    if fts_query.is_empty() {
3167        None
3168    } else {
3169        Some(fts_query)
3170    }
3171}
3172
3173/// Build the UNION ALL FTS5 SQL for entity ranked search.
3174///
3175/// The SQL selects entities by direct FTS5 match (negated BM25) and by alias LIKE match
3176/// (fixed score 0.5), then orders by score descending with a LIMIT applied outside.
3177fn build_ranked_fts_sql() -> String {
3178    format!(
3179        "SELECT * FROM ( \
3180             SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
3181                    e.first_seen_at, e.last_seen_at, e.qdrant_point_id, \
3182                    -bm25(graph_entities_fts, 10.0, 1.0) AS fts_rank \
3183             FROM graph_entities_fts fts \
3184             JOIN graph_entities e ON e.id = fts.rowid \
3185             WHERE graph_entities_fts MATCH ? \
3186             UNION ALL \
3187             SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
3188                    e.first_seen_at, e.last_seen_at, e.qdrant_point_id, \
3189                    0.5 AS fts_rank \
3190             FROM graph_entity_aliases a \
3191             JOIN graph_entities e ON e.id = a.entity_id \
3192             WHERE a.alias_name LIKE ? ESCAPE '\\' {} \
3193         ) \
3194         ORDER BY fts_rank DESC \
3195         LIMIT ?",
3196        <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
3197    )
3198}
3199
3200/// Normalize FTS scores to `[0, 1]` and deduplicate by entity ID.
3201///
3202/// Deduplication keeps the first (highest-ranked) occurrence of each entity ID.
3203fn normalize_and_dedup(rows: Vec<EntityFtsRow>) -> Vec<(Entity, f32)> {
3204    // Guard against div-by-zero when all scores are 0.
3205    let max_score: f64 = rows.iter().map(|r| r.8).fold(0.0_f64, f64::max);
3206    let max_score = if max_score <= 0.0 { 1.0 } else { max_score };
3207
3208    let mut seen_ids: std::collections::HashSet<i64> = std::collections::HashSet::new();
3209    let mut result: Vec<(Entity, f32)> = Vec::with_capacity(rows.len());
3210    for (
3211        id,
3212        name,
3213        canonical_name,
3214        entity_type_str,
3215        summary,
3216        first_seen_at,
3217        last_seen_at,
3218        qdrant_point_id,
3219        raw_score,
3220    ) in rows
3221    {
3222        if !seen_ids.insert(id) {
3223            continue;
3224        }
3225        let entity_type = entity_type_str
3226            .parse()
3227            .unwrap_or(super::types::EntityType::Concept);
3228        let entity = Entity {
3229            id: EntityId(id),
3230            name,
3231            canonical_name,
3232            entity_type,
3233            summary,
3234            first_seen_at,
3235            last_seen_at,
3236            qdrant_point_id,
3237        };
3238        #[allow(clippy::cast_possible_truncation)]
3239        let normalized = (raw_score / max_score).clamp(0.0, 1.0) as f32;
3240        result.push((entity, normalized));
3241    }
3242    result
3243}
3244
3245// ── Tests ─────────────────────────────────────────────────────────────────────
3246
3247#[cfg(test)]
3248mod tests;