Skip to main content

zeph_memory/graph/store/
mod.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use std::collections::HashMap;
5#[allow(unused_imports)]
6use zeph_db::sql;
7
8use futures::Stream;
9use zeph_db::fts::sanitize_fts_query;
10use zeph_db::{ActiveDialect, DbPool, numbered_placeholder, placeholder_list};
11
12use crate::error::MemoryError;
13use crate::types::MessageId;
14
15use super::types::{Community, Edge, EdgeType, Entity, EntityAlias, EntityType};
16
17pub struct GraphStore {
18    pool: DbPool,
19}
20
21impl GraphStore {
22    #[must_use]
23    pub fn new(pool: DbPool) -> Self {
24        Self { pool }
25    }
26
27    #[must_use]
28    pub fn pool(&self) -> &DbPool {
29        &self.pool
30    }
31
32    // ── Entities ─────────────────────────────────────────────────────────────
33
34    /// Insert or update an entity by `(canonical_name, entity_type)`.
35    ///
36    /// - `surface_name`: the original display form (e.g. `"Rust"`) — stored in the `name` column
37    ///   so user-facing output preserves casing. Updated on every upsert to the latest seen form.
38    /// - `canonical_name`: the stable normalized key (e.g. `"rust"`) — used for deduplication.
39    /// - `summary`: pass `None` to preserve the existing summary; pass `Some("")` to blank it.
40    ///
41    /// # Errors
42    ///
43    /// Returns an error if the database query fails.
44    pub async fn upsert_entity(
45        &self,
46        surface_name: &str,
47        canonical_name: &str,
48        entity_type: EntityType,
49        summary: Option<&str>,
50    ) -> Result<i64, MemoryError> {
51        let type_str = entity_type.as_str();
52        let id: i64 = zeph_db::query_scalar(sql!(
53            "INSERT INTO graph_entities (name, canonical_name, entity_type, summary)
54             VALUES (?, ?, ?, ?)
55             ON CONFLICT(canonical_name, entity_type) DO UPDATE SET
56               name = excluded.name,
57               summary = COALESCE(excluded.summary, summary),
58               last_seen_at = CURRENT_TIMESTAMP
59             RETURNING id"
60        ))
61        .bind(surface_name)
62        .bind(canonical_name)
63        .bind(type_str)
64        .bind(summary)
65        .fetch_one(&self.pool)
66        .await?;
67        Ok(id)
68    }
69
70    /// Find an entity by exact canonical name and type.
71    ///
72    /// # Errors
73    ///
74    /// Returns an error if the database query fails.
75    pub async fn find_entity(
76        &self,
77        canonical_name: &str,
78        entity_type: EntityType,
79    ) -> Result<Option<Entity>, MemoryError> {
80        let type_str = entity_type.as_str();
81        let row: Option<EntityRow> = zeph_db::query_as(
82            sql!("SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
83             FROM graph_entities
84             WHERE canonical_name = ? AND entity_type = ?"),
85        )
86        .bind(canonical_name)
87        .bind(type_str)
88        .fetch_optional(&self.pool)
89        .await?;
90        row.map(entity_from_row).transpose()
91    }
92
93    /// Find an entity by its numeric ID.
94    ///
95    /// # Errors
96    ///
97    /// Returns an error if the database query fails.
98    pub async fn find_entity_by_id(&self, entity_id: i64) -> Result<Option<Entity>, MemoryError> {
99        let row: Option<EntityRow> = zeph_db::query_as(
100            sql!("SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
101             FROM graph_entities
102             WHERE id = ?"),
103        )
104        .bind(entity_id)
105        .fetch_optional(&self.pool)
106        .await?;
107        row.map(entity_from_row).transpose()
108    }
109
110    /// Update the `qdrant_point_id` for an entity.
111    ///
112    /// # Errors
113    ///
114    /// Returns an error if the database query fails.
115    pub async fn set_entity_qdrant_point_id(
116        &self,
117        entity_id: i64,
118        point_id: &str,
119    ) -> Result<(), MemoryError> {
120        zeph_db::query(sql!(
121            "UPDATE graph_entities SET qdrant_point_id = ? WHERE id = ?"
122        ))
123        .bind(point_id)
124        .bind(entity_id)
125        .execute(&self.pool)
126        .await?;
127        Ok(())
128    }
129
130    /// Find entities matching `query` in name, summary, or aliases, up to `limit` results, ranked by relevance.
131    ///
132    /// Uses FTS5 MATCH with prefix wildcards (`token*`) and bm25 ranking. Name matches are
133    /// weighted 10x higher than summary matches. Also searches `graph_entity_aliases` for
134    /// alias matches via a UNION query.
135    ///
136    /// # Behavioral note
137    ///
138    /// This replaces the previous `LIKE '%query%'` implementation. FTS5 prefix matching differs
139    /// from substring matching: searching "SQL" will match "`SQLite`" (prefix) but NOT "`GraphQL`"
140    /// (substring). Entity names are indexed as single tokens by the unicode61 tokenizer, so
141    /// mid-word substrings are not matched. This is a known trade-off for index performance.
142    ///
143    /// Single-character queries (e.g., "a") are allowed and produce a broad prefix match ("a*").
144    /// The `limit` parameter caps the result set. No minimum query length is enforced; if this
145    /// causes noise in practice, add a minimum length guard at the call site.
146    ///
147    /// # Errors
148    ///
149    /// Returns an error if the database query fails.
150    pub async fn find_entities_fuzzy(
151        &self,
152        query: &str,
153        limit: usize,
154    ) -> Result<Vec<Entity>, MemoryError> {
155        // FTS5 boolean operator keywords (case-sensitive uppercase). Filtering these
156        // prevents syntax errors when user input contains them as literal search terms
157        // (e.g., "graph OR unrelated" must not produce "graph* OR* unrelated*").
158        const FTS5_OPERATORS: &[&str] = &["AND", "OR", "NOT", "NEAR"];
159        let query = &query[..query.floor_char_boundary(512)];
160        // Sanitize input: split on non-alphanumeric characters, filter empty tokens,
161        // append '*' to each token for FTS5 prefix matching ("graph" -> "graph*").
162        let sanitized = sanitize_fts_query(query);
163        if sanitized.is_empty() {
164            return Ok(vec![]);
165        }
166        let fts_query: String = sanitized
167            .split_whitespace()
168            .filter(|t| !FTS5_OPERATORS.contains(t))
169            .map(|t| format!("{t}*"))
170            .collect::<Vec<_>>()
171            .join(" ");
172        if fts_query.is_empty() {
173            return Ok(vec![]);
174        }
175
176        let limit = i64::try_from(limit)?;
177        // bm25(graph_entities_fts, 10.0, 1.0): name column weighted 10x over summary.
178        // bm25() returns negative values; ORDER BY ASC puts best matches first.
179        let search_sql = format!(
180            "SELECT DISTINCT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
181                    e.first_seen_at, e.last_seen_at, e.qdrant_point_id \
182             FROM graph_entities_fts fts \
183             JOIN graph_entities e ON e.id = fts.rowid \
184             WHERE graph_entities_fts MATCH ? \
185             UNION \
186             SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
187                    e.first_seen_at, e.last_seen_at, e.qdrant_point_id \
188             FROM graph_entity_aliases a \
189             JOIN graph_entities e ON e.id = a.entity_id \
190             WHERE a.alias_name LIKE ? ESCAPE '\\' {} \
191             LIMIT ?",
192            <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
193        );
194        let rows: Vec<EntityRow> = zeph_db::query_as(&search_sql)
195            .bind(&fts_query)
196            .bind(format!(
197                "%{}%",
198                query
199                    .trim()
200                    .replace('\\', "\\\\")
201                    .replace('%', "\\%")
202                    .replace('_', "\\_")
203            ))
204            .bind(limit)
205            .fetch_all(&self.pool)
206            .await?;
207        rows.into_iter()
208            .map(entity_from_row)
209            .collect::<Result<Vec<_>, _>>()
210    }
211
212    /// Flush the `SQLite` WAL to the main database file.
213    ///
214    /// Runs `PRAGMA wal_checkpoint(PASSIVE)`. Safe to call at any time; does not block active
215    /// readers or writers. Call after bulk entity inserts to ensure FTS5 shadow table writes are
216    /// visible to connections opened in future sessions.
217    ///
218    /// # Errors
219    ///
220    /// Returns an error if the PRAGMA execution fails.
221    #[cfg(feature = "sqlite")]
222    pub async fn checkpoint_wal(&self) -> Result<(), MemoryError> {
223        zeph_db::query("PRAGMA wal_checkpoint(PASSIVE)")
224            .execute(&self.pool)
225            .await?;
226        Ok(())
227    }
228
229    /// No-op on `PostgreSQL` (WAL management is handled by the server).
230    ///
231    /// # Errors
232    ///
233    /// Never returns an error.
234    #[cfg(feature = "postgres")]
235    pub async fn checkpoint_wal(&self) -> Result<(), MemoryError> {
236        Ok(())
237    }
238
239    /// Stream all entities from the database incrementally (true cursor, no full-table load).
240    pub fn all_entities_stream(&self) -> impl Stream<Item = Result<Entity, MemoryError>> + '_ {
241        use futures::StreamExt as _;
242        zeph_db::query_as::<_, EntityRow>(
243            sql!("SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
244             FROM graph_entities ORDER BY id ASC"),
245        )
246        .fetch(&self.pool)
247        .map(|r: Result<EntityRow, zeph_db::SqlxError>| {
248            r.map_err(MemoryError::from).and_then(entity_from_row)
249        })
250    }
251
252    // ── Alias methods ─────────────────────────────────────────────────────────
253
254    /// Insert an alias for an entity (idempotent: duplicate alias is silently ignored via UNIQUE constraint).
255    ///
256    /// # Errors
257    ///
258    /// Returns an error if the database query fails.
259    pub async fn add_alias(&self, entity_id: i64, alias_name: &str) -> Result<(), MemoryError> {
260        let insert_alias_sql = format!(
261            "{} INTO graph_entity_aliases (entity_id, alias_name) VALUES (?, ?){}",
262            <ActiveDialect as zeph_db::dialect::Dialect>::INSERT_IGNORE,
263            <ActiveDialect as zeph_db::dialect::Dialect>::CONFLICT_NOTHING,
264        );
265        zeph_db::query(&insert_alias_sql)
266            .bind(entity_id)
267            .bind(alias_name)
268            .execute(&self.pool)
269            .await?;
270        Ok(())
271    }
272
273    /// Find an entity by alias name and entity type (case-insensitive).
274    ///
275    /// Filters by `entity_type` to avoid cross-type alias collisions (S2 fix).
276    ///
277    /// # Errors
278    ///
279    /// Returns an error if the database query fails.
280    pub async fn find_entity_by_alias(
281        &self,
282        alias_name: &str,
283        entity_type: EntityType,
284    ) -> Result<Option<Entity>, MemoryError> {
285        let type_str = entity_type.as_str();
286        let alias_typed_sql = format!(
287            "SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
288                    e.first_seen_at, e.last_seen_at, e.qdrant_point_id \
289             FROM graph_entity_aliases a \
290             JOIN graph_entities e ON e.id = a.entity_id \
291             WHERE a.alias_name = ? {} \
292               AND e.entity_type = ? \
293             ORDER BY e.id ASC \
294             LIMIT 1",
295            <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
296        );
297        let row: Option<EntityRow> = zeph_db::query_as(&alias_typed_sql)
298            .bind(alias_name)
299            .bind(type_str)
300            .fetch_optional(&self.pool)
301            .await?;
302        row.map(entity_from_row).transpose()
303    }
304
305    /// Get all aliases for an entity.
306    ///
307    /// # Errors
308    ///
309    /// Returns an error if the database query fails.
310    pub async fn aliases_for_entity(
311        &self,
312        entity_id: i64,
313    ) -> Result<Vec<EntityAlias>, MemoryError> {
314        let rows: Vec<AliasRow> = zeph_db::query_as(sql!(
315            "SELECT id, entity_id, alias_name, created_at
316             FROM graph_entity_aliases
317             WHERE entity_id = ?
318             ORDER BY id ASC"
319        ))
320        .bind(entity_id)
321        .fetch_all(&self.pool)
322        .await?;
323        Ok(rows.into_iter().map(alias_from_row).collect())
324    }
325
326    /// Collect all entities into a Vec.
327    ///
328    /// # Errors
329    ///
330    /// Returns an error if the database query fails or `entity_type` parsing fails.
331    pub async fn all_entities(&self) -> Result<Vec<Entity>, MemoryError> {
332        use futures::TryStreamExt as _;
333        self.all_entities_stream().try_collect().await
334    }
335
336    /// Count the total number of entities.
337    ///
338    /// # Errors
339    ///
340    /// Returns an error if the database query fails.
341    pub async fn entity_count(&self) -> Result<i64, MemoryError> {
342        let count: i64 = zeph_db::query_scalar(sql!("SELECT COUNT(*) FROM graph_entities"))
343            .fetch_one(&self.pool)
344            .await?;
345        Ok(count)
346    }
347
348    // ── Edges ─────────────────────────────────────────────────────────────────
349
350    /// Insert a new edge between two entities, or update the existing active edge.
351    ///
352    /// An active edge is identified by `(source_entity_id, target_entity_id, relation, edge_type)`
353    /// with `valid_to IS NULL`. If such an edge already exists, its `confidence` is updated to the
354    /// maximum of the stored and incoming values, and the existing id is returned. This prevents
355    /// duplicate edges from repeated extraction of the same context messages.
356    ///
357    /// The dedup key includes `edge_type` (critic mitigation): the same `(source, target, relation)`
358    /// triple can legitimately exist with different edge types (e.g., `depends_on` can be both
359    /// Semantic and Causal). Without `edge_type` in the key, the second insertion would silently
360    /// update the first and lose the type classification.
361    ///
362    /// # Errors
363    ///
364    /// Returns an error if the database query fails.
365    pub async fn insert_edge(
366        &self,
367        source_entity_id: i64,
368        target_entity_id: i64,
369        relation: &str,
370        fact: &str,
371        confidence: f32,
372        episode_id: Option<MessageId>,
373    ) -> Result<i64, MemoryError> {
374        self.insert_edge_typed(
375            source_entity_id,
376            target_entity_id,
377            relation,
378            fact,
379            confidence,
380            episode_id,
381            EdgeType::Semantic,
382        )
383        .await
384    }
385
386    /// Insert a typed edge between two entities, or update the existing active edge of the same type.
387    ///
388    /// Identical semantics to [`insert_edge`] but with an explicit `edge_type` parameter.
389    /// The dedup key is `(source_entity_id, target_entity_id, relation, edge_type, valid_to IS NULL)`.
390    ///
391    /// # Errors
392    ///
393    /// Returns an error if the database query fails.
394    #[allow(clippy::too_many_arguments)]
395    pub async fn insert_edge_typed(
396        &self,
397        source_entity_id: i64,
398        target_entity_id: i64,
399        relation: &str,
400        fact: &str,
401        confidence: f32,
402        episode_id: Option<MessageId>,
403        edge_type: EdgeType,
404    ) -> Result<i64, MemoryError> {
405        if source_entity_id == target_entity_id {
406            return Err(MemoryError::InvalidInput(format!(
407                "self-loop edge rejected: source and target are the same entity (id={source_entity_id})"
408            )));
409        }
410        let confidence = confidence.clamp(0.0, 1.0);
411        let edge_type_str = edge_type.as_str();
412
413        // Wrap SELECT + INSERT/UPDATE in a single transaction to eliminate the race window
414        // between existence check and write. The unique partial index uq_graph_edges_active
415        // covers (source, target, relation, edge_type) WHERE valid_to IS NULL; SQLite does not
416        // support ON CONFLICT DO UPDATE against partial indexes, so we keep two statements.
417        let mut tx = zeph_db::begin(&self.pool).await?;
418
419        let existing: Option<(i64, f64)> = zeph_db::query_as(sql!(
420            "SELECT id, confidence FROM graph_edges
421             WHERE source_entity_id = ?
422               AND target_entity_id = ?
423               AND relation = ?
424               AND edge_type = ?
425               AND valid_to IS NULL
426             LIMIT 1"
427        ))
428        .bind(source_entity_id)
429        .bind(target_entity_id)
430        .bind(relation)
431        .bind(edge_type_str)
432        .fetch_optional(&mut *tx)
433        .await?;
434
435        if let Some((existing_id, stored_conf)) = existing {
436            let updated_conf = f64::from(confidence).max(stored_conf);
437            zeph_db::query(sql!("UPDATE graph_edges SET confidence = ? WHERE id = ?"))
438                .bind(updated_conf)
439                .bind(existing_id)
440                .execute(&mut *tx)
441                .await?;
442            tx.commit().await?;
443            return Ok(existing_id);
444        }
445
446        let episode_raw: Option<i64> = episode_id.map(|m| m.0);
447        let id: i64 = zeph_db::query_scalar(sql!(
448            "INSERT INTO graph_edges
449             (source_entity_id, target_entity_id, relation, fact, confidence, episode_id, edge_type)
450             VALUES (?, ?, ?, ?, ?, ?, ?)
451             RETURNING id"
452        ))
453        .bind(source_entity_id)
454        .bind(target_entity_id)
455        .bind(relation)
456        .bind(fact)
457        .bind(f64::from(confidence))
458        .bind(episode_raw)
459        .bind(edge_type_str)
460        .fetch_one(&mut *tx)
461        .await?;
462        tx.commit().await?;
463        Ok(id)
464    }
465
466    /// Mark an edge as invalid (set `valid_to` and `expired_at` to now).
467    ///
468    /// # Errors
469    ///
470    /// Returns an error if the database update fails.
471    pub async fn invalidate_edge(&self, edge_id: i64) -> Result<(), MemoryError> {
472        zeph_db::query(sql!(
473            "UPDATE graph_edges SET valid_to = CURRENT_TIMESTAMP, expired_at = CURRENT_TIMESTAMP
474             WHERE id = ?"
475        ))
476        .bind(edge_id)
477        .execute(&self.pool)
478        .await?;
479        Ok(())
480    }
481
482    /// Get all active edges for a batch of entity IDs, with optional MAGMA edge type filtering.
483    ///
484    /// Fetches all currently-active edges (`valid_to IS NULL`) where either endpoint
485    /// is in `entity_ids`. Traversal is always current-time only (no `at_timestamp` support
486    /// in v1 — see `bfs_at_timestamp` for historical traversal).
487    ///
488    /// # `SQLite` bind limit safety
489    ///
490    /// `SQLite` limits the number of bind parameters to `SQLITE_MAX_VARIABLE_NUMBER` (999 by
491    /// default). Each entity ID requires two bind slots (source OR target), so batches are
492    /// chunked at `MAX_BATCH_ENTITIES = 490` to stay safely under the limit regardless of
493    /// compile-time `SQLite` configuration.
494    ///
495    /// # Errors
496    ///
497    /// Returns an error if the database query fails.
498    pub async fn edges_for_entities(
499        &self,
500        entity_ids: &[i64],
501        edge_types: &[super::types::EdgeType],
502    ) -> Result<Vec<Edge>, MemoryError> {
503        // Safe margin under SQLite SQLITE_MAX_VARIABLE_NUMBER (999):
504        // each entity ID uses 2 bind slots (source_entity_id OR target_entity_id).
505        // 490 * 2 = 980, leaving headroom for future query additions.
506        const MAX_BATCH_ENTITIES: usize = 490;
507
508        let mut all_edges: Vec<Edge> = Vec::new();
509
510        for chunk in entity_ids.chunks(MAX_BATCH_ENTITIES) {
511            let edges = self.query_batch_edges(chunk, edge_types).await?;
512            all_edges.extend(edges);
513        }
514
515        Ok(all_edges)
516    }
517
518    /// Query active edges for a single chunk of entity IDs (internal helper).
519    ///
520    /// Caller is responsible for ensuring `entity_ids.len() <= MAX_BATCH_ENTITIES`.
521    async fn query_batch_edges(
522        &self,
523        entity_ids: &[i64],
524        edge_types: &[super::types::EdgeType],
525    ) -> Result<Vec<Edge>, MemoryError> {
526        if entity_ids.is_empty() {
527            return Ok(Vec::new());
528        }
529
530        // Build a parameterized IN clause with backend-appropriate placeholders.
531        // We cannot use the sql! macro here because the placeholder count is dynamic.
532        let n_ids = entity_ids.len();
533        let n_types = edge_types.len();
534
535        let sql = if n_types == 0 {
536            // placeholders used twice (source IN and target IN)
537            let placeholders = placeholder_list(1, n_ids);
538            let placeholders2 = placeholder_list(n_ids + 1, n_ids);
539            format!(
540                "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
541                        valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
542                        edge_type, retrieval_count, last_retrieved_at
543                 FROM graph_edges
544                 WHERE valid_to IS NULL
545                   AND (source_entity_id IN ({placeholders}) OR target_entity_id IN ({placeholders2}))"
546            )
547        } else {
548            let placeholders = placeholder_list(1, n_ids);
549            let placeholders2 = placeholder_list(n_ids + 1, n_ids);
550            let type_placeholders = placeholder_list(n_ids * 2 + 1, n_types);
551            format!(
552                "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
553                        valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
554                        edge_type, retrieval_count, last_retrieved_at
555                 FROM graph_edges
556                 WHERE valid_to IS NULL
557                   AND (source_entity_id IN ({placeholders}) OR target_entity_id IN ({placeholders2}))
558                   AND edge_type IN ({type_placeholders})"
559            )
560        };
561
562        // Bind entity IDs twice (source IN and target IN clauses) then edge types.
563        let mut query = zeph_db::query_as::<_, EdgeRow>(&sql);
564        for id in entity_ids {
565            query = query.bind(*id);
566        }
567        for id in entity_ids {
568            query = query.bind(*id);
569        }
570        for et in edge_types {
571            query = query.bind(et.as_str());
572        }
573
574        let rows: Vec<EdgeRow> = query.fetch_all(&self.pool).await?;
575        Ok(rows.into_iter().map(edge_from_row).collect())
576    }
577
578    /// Get all active edges where entity is source or target.
579    ///
580    /// # Errors
581    ///
582    /// Returns an error if the database query fails.
583    pub async fn edges_for_entity(&self, entity_id: i64) -> Result<Vec<Edge>, MemoryError> {
584        let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
585            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
586                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
587                    edge_type, retrieval_count, last_retrieved_at
588             FROM graph_edges
589             WHERE valid_to IS NULL
590               AND (source_entity_id = ? OR target_entity_id = ?)"
591        ))
592        .bind(entity_id)
593        .bind(entity_id)
594        .fetch_all(&self.pool)
595        .await?;
596        Ok(rows.into_iter().map(edge_from_row).collect())
597    }
598
599    /// Get all edges (active and expired) where entity is source or target, ordered by
600    /// `valid_from DESC`. Used by the `/graph history <name>` slash command.
601    ///
602    /// # Errors
603    ///
604    /// Returns an error if the database query fails or if `limit` overflows `i64`.
605    pub async fn edge_history_for_entity(
606        &self,
607        entity_id: i64,
608        limit: usize,
609    ) -> Result<Vec<Edge>, MemoryError> {
610        let limit = i64::try_from(limit)?;
611        let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
612            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
613                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
614                    edge_type, retrieval_count, last_retrieved_at
615             FROM graph_edges
616             WHERE source_entity_id = ? OR target_entity_id = ?
617             ORDER BY valid_from DESC
618             LIMIT ?"
619        ))
620        .bind(entity_id)
621        .bind(entity_id)
622        .bind(limit)
623        .fetch_all(&self.pool)
624        .await?;
625        Ok(rows.into_iter().map(edge_from_row).collect())
626    }
627
628    /// Get all active edges between two entities (both directions).
629    ///
630    /// # Errors
631    ///
632    /// Returns an error if the database query fails.
633    pub async fn edges_between(
634        &self,
635        entity_a: i64,
636        entity_b: i64,
637    ) -> Result<Vec<Edge>, MemoryError> {
638        let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
639            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
640                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
641                    edge_type, retrieval_count, last_retrieved_at
642             FROM graph_edges
643             WHERE valid_to IS NULL
644               AND ((source_entity_id = ? AND target_entity_id = ?)
645                 OR (source_entity_id = ? AND target_entity_id = ?))"
646        ))
647        .bind(entity_a)
648        .bind(entity_b)
649        .bind(entity_b)
650        .bind(entity_a)
651        .fetch_all(&self.pool)
652        .await?;
653        Ok(rows.into_iter().map(edge_from_row).collect())
654    }
655
656    /// Get active edges from `source` to `target` in the exact direction (no reverse).
657    ///
658    /// # Errors
659    ///
660    /// Returns an error if the database query fails.
661    pub async fn edges_exact(
662        &self,
663        source_entity_id: i64,
664        target_entity_id: i64,
665    ) -> Result<Vec<Edge>, MemoryError> {
666        let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
667            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
668                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
669                    edge_type, retrieval_count, last_retrieved_at
670             FROM graph_edges
671             WHERE valid_to IS NULL
672               AND source_entity_id = ?
673               AND target_entity_id = ?"
674        ))
675        .bind(source_entity_id)
676        .bind(target_entity_id)
677        .fetch_all(&self.pool)
678        .await?;
679        Ok(rows.into_iter().map(edge_from_row).collect())
680    }
681
682    /// Count active (non-invalidated) edges.
683    ///
684    /// # Errors
685    ///
686    /// Returns an error if the database query fails.
687    pub async fn active_edge_count(&self) -> Result<i64, MemoryError> {
688        let count: i64 = zeph_db::query_scalar(sql!(
689            "SELECT COUNT(*) FROM graph_edges WHERE valid_to IS NULL"
690        ))
691        .fetch_one(&self.pool)
692        .await?;
693        Ok(count)
694    }
695
696    /// Return per-type active edge counts as `(edge_type, count)` pairs.
697    ///
698    /// # Errors
699    ///
700    /// Returns an error if the database query fails.
701    pub async fn edge_type_distribution(&self) -> Result<Vec<(String, i64)>, MemoryError> {
702        let rows: Vec<(String, i64)> = zeph_db::query_as(
703            sql!("SELECT edge_type, COUNT(*) FROM graph_edges WHERE valid_to IS NULL GROUP BY edge_type ORDER BY edge_type"),
704        )
705        .fetch_all(&self.pool)
706        .await?;
707        Ok(rows)
708    }
709
710    // ── Communities ───────────────────────────────────────────────────────────
711
712    /// Insert or update a community by name.
713    ///
714    /// `fingerprint` is a BLAKE3 hex string computed from sorted entity IDs and
715    /// intra-community edge IDs. Pass `None` to leave the fingerprint unchanged (e.g. when
716    /// `assign_to_community` adds an entity without a full re-detection pass).
717    ///
718    /// # Errors
719    ///
720    /// Returns an error if the database query fails or JSON serialization fails.
721    pub async fn upsert_community(
722        &self,
723        name: &str,
724        summary: &str,
725        entity_ids: &[i64],
726        fingerprint: Option<&str>,
727    ) -> Result<i64, MemoryError> {
728        let entity_ids_json = serde_json::to_string(entity_ids)?;
729        let id: i64 = zeph_db::query_scalar(sql!(
730            "INSERT INTO graph_communities (name, summary, entity_ids, fingerprint)
731             VALUES (?, ?, ?, ?)
732             ON CONFLICT(name) DO UPDATE SET
733               summary = excluded.summary,
734               entity_ids = excluded.entity_ids,
735               fingerprint = COALESCE(excluded.fingerprint, fingerprint),
736               updated_at = CURRENT_TIMESTAMP
737             RETURNING id"
738        ))
739        .bind(name)
740        .bind(summary)
741        .bind(entity_ids_json)
742        .bind(fingerprint)
743        .fetch_one(&self.pool)
744        .await?;
745        Ok(id)
746    }
747
748    /// Return a map of `fingerprint -> community_id` for all communities with a non-NULL
749    /// fingerprint. Used by `detect_communities` to skip unchanged partitions.
750    ///
751    /// # Errors
752    ///
753    /// Returns an error if the database query fails.
754    pub async fn community_fingerprints(&self) -> Result<HashMap<String, i64>, MemoryError> {
755        let rows: Vec<(String, i64)> = zeph_db::query_as(sql!(
756            "SELECT fingerprint, id FROM graph_communities WHERE fingerprint IS NOT NULL"
757        ))
758        .fetch_all(&self.pool)
759        .await?;
760        Ok(rows.into_iter().collect())
761    }
762
763    /// Delete a single community by its primary key.
764    ///
765    /// # Errors
766    ///
767    /// Returns an error if the database query fails.
768    pub async fn delete_community_by_id(&self, id: i64) -> Result<(), MemoryError> {
769        zeph_db::query(sql!("DELETE FROM graph_communities WHERE id = ?"))
770            .bind(id)
771            .execute(&self.pool)
772            .await?;
773        Ok(())
774    }
775
776    /// Set the fingerprint of a community to `NULL`, invalidating the incremental cache.
777    ///
778    /// Used by `assign_to_community` when an entity is added without a full re-detection pass,
779    /// ensuring the next `detect_communities` run re-summarizes the affected community.
780    ///
781    /// # Errors
782    ///
783    /// Returns an error if the database query fails.
784    pub async fn clear_community_fingerprint(&self, id: i64) -> Result<(), MemoryError> {
785        zeph_db::query(sql!(
786            "UPDATE graph_communities SET fingerprint = NULL WHERE id = ?"
787        ))
788        .bind(id)
789        .execute(&self.pool)
790        .await?;
791        Ok(())
792    }
793
794    /// Find the first community that contains the given `entity_id`.
795    ///
796    /// Uses `json_each()` to push the membership search into `SQLite`, avoiding a full
797    /// table scan with per-row JSON parsing.
798    ///
799    /// # Errors
800    ///
801    /// Returns an error if the database query fails or JSON parsing fails.
802    pub async fn community_for_entity(
803        &self,
804        entity_id: i64,
805    ) -> Result<Option<Community>, MemoryError> {
806        let row: Option<CommunityRow> = zeph_db::query_as(
807            sql!("SELECT c.id, c.name, c.summary, c.entity_ids, c.fingerprint, c.created_at, c.updated_at
808             FROM graph_communities c, json_each(c.entity_ids) j
809             WHERE CAST(j.value AS INTEGER) = ?
810             LIMIT 1"),
811        )
812        .bind(entity_id)
813        .fetch_optional(&self.pool)
814        .await?;
815        match row {
816            Some(row) => {
817                let entity_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
818                Ok(Some(Community {
819                    id: row.id,
820                    name: row.name,
821                    summary: row.summary,
822                    entity_ids,
823                    fingerprint: row.fingerprint,
824                    created_at: row.created_at,
825                    updated_at: row.updated_at,
826                }))
827            }
828            None => Ok(None),
829        }
830    }
831
832    /// Get all communities.
833    ///
834    /// # Errors
835    ///
836    /// Returns an error if the database query fails or JSON parsing fails.
837    pub async fn all_communities(&self) -> Result<Vec<Community>, MemoryError> {
838        let rows: Vec<CommunityRow> = zeph_db::query_as(sql!(
839            "SELECT id, name, summary, entity_ids, fingerprint, created_at, updated_at
840             FROM graph_communities
841             ORDER BY id ASC"
842        ))
843        .fetch_all(&self.pool)
844        .await?;
845
846        rows.into_iter()
847            .map(|row| {
848                let entity_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
849                Ok(Community {
850                    id: row.id,
851                    name: row.name,
852                    summary: row.summary,
853                    entity_ids,
854                    fingerprint: row.fingerprint,
855                    created_at: row.created_at,
856                    updated_at: row.updated_at,
857                })
858            })
859            .collect()
860    }
861
862    /// Count the total number of communities.
863    ///
864    /// # Errors
865    ///
866    /// Returns an error if the database query fails.
867    pub async fn community_count(&self) -> Result<i64, MemoryError> {
868        let count: i64 = zeph_db::query_scalar(sql!("SELECT COUNT(*) FROM graph_communities"))
869            .fetch_one(&self.pool)
870            .await?;
871        Ok(count)
872    }
873
874    // ── Metadata ──────────────────────────────────────────────────────────────
875
876    /// Get a metadata value by key.
877    ///
878    /// # Errors
879    ///
880    /// Returns an error if the database query fails.
881    pub async fn get_metadata(&self, key: &str) -> Result<Option<String>, MemoryError> {
882        let val: Option<String> =
883            zeph_db::query_scalar(sql!("SELECT value FROM graph_metadata WHERE key = ?"))
884                .bind(key)
885                .fetch_optional(&self.pool)
886                .await?;
887        Ok(val)
888    }
889
890    /// Set a metadata value by key (upsert).
891    ///
892    /// # Errors
893    ///
894    /// Returns an error if the database query fails.
895    pub async fn set_metadata(&self, key: &str, value: &str) -> Result<(), MemoryError> {
896        zeph_db::query(sql!(
897            "INSERT INTO graph_metadata (key, value) VALUES (?, ?)
898             ON CONFLICT(key) DO UPDATE SET value = excluded.value"
899        ))
900        .bind(key)
901        .bind(value)
902        .execute(&self.pool)
903        .await?;
904        Ok(())
905    }
906
907    /// Get the current extraction count from metadata.
908    ///
909    /// Returns 0 if the counter has not been initialized.
910    ///
911    /// # Errors
912    ///
913    /// Returns an error if the database query fails.
914    pub async fn extraction_count(&self) -> Result<i64, MemoryError> {
915        let val = self.get_metadata("extraction_count").await?;
916        Ok(val.and_then(|v| v.parse::<i64>().ok()).unwrap_or(0))
917    }
918
919    /// Stream all active (non-invalidated) edges.
920    pub fn all_active_edges_stream(&self) -> impl Stream<Item = Result<Edge, MemoryError>> + '_ {
921        use futures::StreamExt as _;
922        zeph_db::query_as::<_, EdgeRow>(sql!(
923            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
924                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
925                    edge_type, retrieval_count, last_retrieved_at
926             FROM graph_edges
927             WHERE valid_to IS NULL
928             ORDER BY id ASC"
929        ))
930        .fetch(&self.pool)
931        .map(|r| r.map_err(MemoryError::from).map(edge_from_row))
932    }
933
934    /// Fetch a chunk of active edges using keyset pagination.
935    ///
936    /// Returns edges with `id > after_id` in ascending order, up to `limit` rows.
937    /// Starting with `after_id = 0` returns the first chunk. Pass the last `id` from
938    /// the returned chunk as `after_id` for the next page. An empty result means all
939    /// edges have been consumed.
940    ///
941    /// Keyset pagination is O(1) per page (index seek on `id`) vs OFFSET which is O(N).
942    /// It is also stable under concurrent inserts: new edges get monotonically higher IDs
943    /// and will appear in subsequent chunks or after the last chunk, never causing
944    /// duplicates. Concurrent invalidations (setting `valid_to`) may cause a single edge
945    /// to be skipped, which is acceptable — LPA operates on an eventual-consistency snapshot.
946    ///
947    /// # Errors
948    ///
949    /// Returns an error if the database query fails.
950    pub async fn edges_after_id(
951        &self,
952        after_id: i64,
953        limit: i64,
954    ) -> Result<Vec<Edge>, MemoryError> {
955        let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
956            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
957                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
958                    edge_type, retrieval_count, last_retrieved_at
959             FROM graph_edges
960             WHERE valid_to IS NULL AND id > ?
961             ORDER BY id ASC
962             LIMIT ?"
963        ))
964        .bind(after_id)
965        .bind(limit)
966        .fetch_all(&self.pool)
967        .await?;
968        Ok(rows.into_iter().map(edge_from_row).collect())
969    }
970
971    /// Find a community by its primary key.
972    ///
973    /// # Errors
974    ///
975    /// Returns an error if the database query fails or JSON parsing fails.
976    pub async fn find_community_by_id(&self, id: i64) -> Result<Option<Community>, MemoryError> {
977        let row: Option<CommunityRow> = zeph_db::query_as(sql!(
978            "SELECT id, name, summary, entity_ids, fingerprint, created_at, updated_at
979             FROM graph_communities
980             WHERE id = ?"
981        ))
982        .bind(id)
983        .fetch_optional(&self.pool)
984        .await?;
985        match row {
986            Some(row) => {
987                let entity_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
988                Ok(Some(Community {
989                    id: row.id,
990                    name: row.name,
991                    summary: row.summary,
992                    entity_ids,
993                    fingerprint: row.fingerprint,
994                    created_at: row.created_at,
995                    updated_at: row.updated_at,
996                }))
997            }
998            None => Ok(None),
999        }
1000    }
1001
1002    /// Delete all communities (full rebuild before upsert).
1003    ///
1004    /// # Errors
1005    ///
1006    /// Returns an error if the database query fails.
1007    pub async fn delete_all_communities(&self) -> Result<(), MemoryError> {
1008        zeph_db::query(sql!("DELETE FROM graph_communities"))
1009            .execute(&self.pool)
1010            .await?;
1011        Ok(())
1012    }
1013
1014    // ── A-MEM Retrieval Tracking ──────────────────────────────────────────────
1015
1016    /// Find entities matching `query` and return them with normalized FTS5 scores.
1017    ///
1018    /// Returns `Vec<(Entity, fts_score)>` where `fts_score` is normalized to `[0.0, 1.0]`
1019    /// by dividing each negated BM25 value by the maximum in the result set.
1020    /// Alias matches receive a fixed score of `0.5` (relative to FTS matches before normalization).
1021    ///
1022    /// Uses `UNION ALL` with outer `ORDER BY` to preserve FTS5 ordering through the LIMIT.
1023    ///
1024    /// # Errors
1025    ///
1026    /// Returns an error if the database query fails.
1027    #[allow(clippy::too_many_lines)]
1028    pub async fn find_entities_ranked(
1029        &self,
1030        query: &str,
1031        limit: usize,
1032    ) -> Result<Vec<(Entity, f32)>, MemoryError> {
1033        // Row type for UNION ALL FTS5 query: (id, name, canonical_name, entity_type,
1034        // summary, first_seen_at, last_seen_at, qdrant_point_id, fts_rank).
1035        type EntityFtsRow = (
1036            i64,
1037            String,
1038            String,
1039            String,
1040            Option<String>,
1041            String,
1042            String,
1043            Option<String>,
1044            f64,
1045        );
1046
1047        const FTS5_OPERATORS: &[&str] = &["AND", "OR", "NOT", "NEAR"];
1048        let query = &query[..query.floor_char_boundary(512)];
1049        let sanitized = sanitize_fts_query(query);
1050        if sanitized.is_empty() {
1051            return Ok(vec![]);
1052        }
1053        let fts_query: String = sanitized
1054            .split_whitespace()
1055            .filter(|t| !FTS5_OPERATORS.contains(t))
1056            .map(|t| format!("{t}*"))
1057            .collect::<Vec<_>>()
1058            .join(" ");
1059        if fts_query.is_empty() {
1060            return Ok(vec![]);
1061        }
1062
1063        let limit_i64 = i64::try_from(limit)?;
1064
1065        // UNION ALL with outer ORDER BY preserves FTS5 BM25 ordering through LIMIT.
1066        // Alias matches get a fixed raw score of 0.5 (below any real BM25 match).
1067        let ranked_fts_sql = format!(
1068            "SELECT * FROM ( \
1069                 SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
1070                        e.first_seen_at, e.last_seen_at, e.qdrant_point_id, \
1071                        -bm25(graph_entities_fts, 10.0, 1.0) AS fts_rank \
1072                 FROM graph_entities_fts fts \
1073                 JOIN graph_entities e ON e.id = fts.rowid \
1074                 WHERE graph_entities_fts MATCH ? \
1075                 UNION ALL \
1076                 SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
1077                        e.first_seen_at, e.last_seen_at, e.qdrant_point_id, \
1078                        0.5 AS fts_rank \
1079                 FROM graph_entity_aliases a \
1080                 JOIN graph_entities e ON e.id = a.entity_id \
1081                 WHERE a.alias_name LIKE ? ESCAPE '\\\\' {} \
1082             ) \
1083             ORDER BY fts_rank DESC \
1084             LIMIT ?",
1085            <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
1086        );
1087        let rows: Vec<EntityFtsRow> = zeph_db::query_as(&ranked_fts_sql)
1088            .bind(&fts_query)
1089            .bind(format!(
1090                "%{}%",
1091                query
1092                    .trim()
1093                    .replace('\\', "\\\\")
1094                    .replace('%', "\\%")
1095                    .replace('_', "\\_")
1096            ))
1097            .bind(limit_i64)
1098            .fetch_all(&self.pool)
1099            .await?;
1100
1101        if rows.is_empty() {
1102            return Ok(vec![]);
1103        }
1104
1105        // Normalize FTS scores to [0, 1] by dividing by max; guard against div-by-zero.
1106        let max_score: f64 = rows.iter().map(|r| r.8).fold(0.0_f64, f64::max);
1107        let max_score = if max_score <= 0.0 { 1.0 } else { max_score };
1108
1109        // Deduplicate by entity ID (keep first/highest-ranked occurrence).
1110        let mut seen_ids: std::collections::HashSet<i64> = std::collections::HashSet::new();
1111        let mut result: Vec<(Entity, f32)> = Vec::with_capacity(rows.len());
1112        for (
1113            id,
1114            name,
1115            canonical_name,
1116            entity_type_str,
1117            summary,
1118            first_seen_at,
1119            last_seen_at,
1120            qdrant_point_id,
1121            raw_score,
1122        ) in rows
1123        {
1124            if !seen_ids.insert(id) {
1125                continue;
1126            }
1127            let entity_type = entity_type_str
1128                .parse()
1129                .unwrap_or(super::types::EntityType::Concept);
1130            let entity = Entity {
1131                id,
1132                name,
1133                canonical_name,
1134                entity_type,
1135                summary,
1136                first_seen_at,
1137                last_seen_at,
1138                qdrant_point_id,
1139            };
1140            #[allow(clippy::cast_possible_truncation)]
1141            let normalized = (raw_score / max_score).clamp(0.0, 1.0) as f32;
1142            result.push((entity, normalized));
1143        }
1144
1145        Ok(result)
1146    }
1147
1148    /// Compute structural scores (degree + edge type diversity) for a batch of entity IDs.
1149    ///
1150    /// Returns `HashMap<entity_id, structural_score>` where score is in `[0.0, 1.0]`.
1151    /// Formula: `0.6 * (degree / max_degree) + 0.4 * (type_diversity / 4.0)`.
1152    /// Entities with no edges receive score `0.0`.
1153    ///
1154    /// # Errors
1155    ///
1156    /// Returns an error if the database query fails.
1157    pub async fn entity_structural_scores(
1158        &self,
1159        entity_ids: &[i64],
1160    ) -> Result<HashMap<i64, f32>, MemoryError> {
1161        // Each query binds entity_ids three times (three IN clauses).
1162        // Stay safely under SQLite 999-variable limit: 999 / 3 = 333, use 163 for headroom.
1163        const MAX_BATCH: usize = 163;
1164
1165        if entity_ids.is_empty() {
1166            return Ok(HashMap::new());
1167        }
1168
1169        let mut all_rows: Vec<(i64, i64, i64)> = Vec::new();
1170        for chunk in entity_ids.chunks(MAX_BATCH) {
1171            let n = chunk.len();
1172            // Three copies of chunk IDs: positions 1..n, n+1..2n, 2n+1..3n
1173            let ph1 = placeholder_list(1, n);
1174            let ph2 = placeholder_list(n + 1, n);
1175            let ph3 = placeholder_list(n * 2 + 1, n);
1176
1177            // Build query: count degree and distinct edge types for each entity.
1178            let sql = format!(
1179                "SELECT entity_id,
1180                        COUNT(*) AS degree,
1181                        COUNT(DISTINCT edge_type) AS type_diversity
1182                 FROM (
1183                     SELECT source_entity_id AS entity_id, edge_type
1184                     FROM graph_edges
1185                     WHERE valid_to IS NULL AND source_entity_id IN ({ph1})
1186                     UNION ALL
1187                     SELECT target_entity_id AS entity_id, edge_type
1188                     FROM graph_edges
1189                     WHERE valid_to IS NULL AND target_entity_id IN ({ph2})
1190                 )
1191                 WHERE entity_id IN ({ph3})
1192                 GROUP BY entity_id"
1193            );
1194
1195            let mut query = zeph_db::query_as::<_, (i64, i64, i64)>(&sql);
1196            // Bind chunk three times (three IN clauses)
1197            for id in chunk {
1198                query = query.bind(*id);
1199            }
1200            for id in chunk {
1201                query = query.bind(*id);
1202            }
1203            for id in chunk {
1204                query = query.bind(*id);
1205            }
1206
1207            let chunk_rows: Vec<(i64, i64, i64)> = query.fetch_all(&self.pool).await?;
1208            all_rows.extend(chunk_rows);
1209        }
1210
1211        if all_rows.is_empty() {
1212            return Ok(entity_ids.iter().map(|&id| (id, 0.0_f32)).collect());
1213        }
1214
1215        let max_degree = all_rows
1216            .iter()
1217            .map(|(_, d, _)| *d)
1218            .max()
1219            .unwrap_or(1)
1220            .max(1);
1221
1222        let mut scores: HashMap<i64, f32> = entity_ids.iter().map(|&id| (id, 0.0_f32)).collect();
1223        for (entity_id, degree, type_diversity) in all_rows {
1224            #[allow(clippy::cast_precision_loss)]
1225            let norm_degree = degree as f32 / max_degree as f32;
1226            #[allow(clippy::cast_precision_loss)]
1227            let norm_diversity = (type_diversity as f32 / 4.0).min(1.0);
1228            let score = 0.6 * norm_degree + 0.4 * norm_diversity;
1229            scores.insert(entity_id, score);
1230        }
1231
1232        Ok(scores)
1233    }
1234
1235    /// Look up community IDs for a batch of entity IDs.
1236    ///
1237    /// Returns `HashMap<entity_id, community_id>`. Entities not assigned to any community
1238    /// are absent from the map (treated as `None` by callers — no community cap applied).
1239    ///
1240    /// # Errors
1241    ///
1242    /// Returns an error if the database query fails.
1243    pub async fn entity_community_ids(
1244        &self,
1245        entity_ids: &[i64],
1246    ) -> Result<HashMap<i64, i64>, MemoryError> {
1247        const MAX_BATCH: usize = 490;
1248
1249        if entity_ids.is_empty() {
1250            return Ok(HashMap::new());
1251        }
1252
1253        let mut result: HashMap<i64, i64> = HashMap::new();
1254        for chunk in entity_ids.chunks(MAX_BATCH) {
1255            let placeholders = placeholder_list(1, chunk.len());
1256
1257            let community_sql = community_ids_sql(&placeholders);
1258            let mut query = zeph_db::query_as::<_, (i64, i64)>(&community_sql);
1259            for id in chunk {
1260                query = query.bind(*id);
1261            }
1262
1263            let rows: Vec<(i64, i64)> = query.fetch_all(&self.pool).await?;
1264            result.extend(rows);
1265        }
1266
1267        Ok(result)
1268    }
1269
1270    /// Increment `retrieval_count` and set `last_retrieved_at` for a batch of edge IDs.
1271    ///
1272    /// Fire-and-forget: errors are logged but not propagated. Caller should log the warning.
1273    /// Batched with `MAX_BATCH = 490` to stay safely under `SQLite` bind variable limit.
1274    ///
1275    /// # Errors
1276    ///
1277    /// Returns an error if the database query fails.
1278    pub async fn record_edge_retrieval(&self, edge_ids: &[i64]) -> Result<(), MemoryError> {
1279        const MAX_BATCH: usize = 490;
1280        let epoch_now = <ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1281        for chunk in edge_ids.chunks(MAX_BATCH) {
1282            let edge_placeholders = placeholder_list(1, chunk.len());
1283            let retrieval_sql = format!(
1284                "UPDATE graph_edges \
1285                 SET retrieval_count = retrieval_count + 1, \
1286                     last_retrieved_at = {epoch_now} \
1287                 WHERE id IN ({edge_placeholders})"
1288            );
1289            let mut q = zeph_db::query(&retrieval_sql);
1290            for id in chunk {
1291                q = q.bind(*id);
1292            }
1293            q.execute(&self.pool).await?;
1294        }
1295        Ok(())
1296    }
1297
1298    /// Apply multiplicative decay to `retrieval_count` for un-retrieved active edges.
1299    ///
1300    /// Only edges with `retrieval_count > 0` and `last_retrieved_at < (now - interval_secs)`
1301    /// are updated. Returns the number of rows affected.
1302    ///
1303    /// # Errors
1304    ///
1305    /// Returns an error if the database query fails.
1306    pub async fn decay_edge_retrieval_counts(
1307        &self,
1308        decay_lambda: f64,
1309        interval_secs: u64,
1310    ) -> Result<usize, MemoryError> {
1311        let epoch_now_decay = <ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1312        let decay_raw = format!(
1313            "UPDATE graph_edges \
1314             SET retrieval_count = MAX(CAST(retrieval_count * ? AS INTEGER), 0) \
1315             WHERE valid_to IS NULL \
1316               AND retrieval_count > 0 \
1317               AND (last_retrieved_at IS NULL OR last_retrieved_at < {epoch_now_decay} - ?)"
1318        );
1319        let decay_sql = zeph_db::rewrite_placeholders(&decay_raw);
1320        let result = zeph_db::query(&decay_sql)
1321            .bind(decay_lambda)
1322            .bind(i64::try_from(interval_secs).unwrap_or(i64::MAX))
1323            .execute(&self.pool)
1324            .await?;
1325        Ok(usize::try_from(result.rows_affected())?)
1326    }
1327
1328    /// Delete expired edges older than `retention_days` and return count deleted.
1329    ///
1330    /// # Errors
1331    ///
1332    /// Returns an error if the database query fails.
1333    pub async fn delete_expired_edges(&self, retention_days: u32) -> Result<usize, MemoryError> {
1334        let days = i64::from(retention_days);
1335        let result = zeph_db::query(sql!(
1336            "DELETE FROM graph_edges
1337             WHERE expired_at IS NOT NULL
1338               AND expired_at < datetime('now', '-' || ? || ' days')"
1339        ))
1340        .bind(days)
1341        .execute(&self.pool)
1342        .await?;
1343        Ok(usize::try_from(result.rows_affected())?)
1344    }
1345
1346    /// Delete orphan entities (no active edges, last seen more than `retention_days` ago).
1347    ///
1348    /// # Errors
1349    ///
1350    /// Returns an error if the database query fails.
1351    pub async fn delete_orphan_entities(&self, retention_days: u32) -> Result<usize, MemoryError> {
1352        let days = i64::from(retention_days);
1353        let result = zeph_db::query(sql!(
1354            "DELETE FROM graph_entities
1355             WHERE id NOT IN (
1356                 SELECT DISTINCT source_entity_id FROM graph_edges WHERE valid_to IS NULL
1357                 UNION
1358                 SELECT DISTINCT target_entity_id FROM graph_edges WHERE valid_to IS NULL
1359             )
1360             AND last_seen_at < datetime('now', '-' || ? || ' days')"
1361        ))
1362        .bind(days)
1363        .execute(&self.pool)
1364        .await?;
1365        Ok(usize::try_from(result.rows_affected())?)
1366    }
1367
1368    /// Delete the oldest excess entities when count exceeds `max_entities`.
1369    ///
1370    /// Entities are ranked by ascending edge count, then ascending `last_seen_at` (LRU).
1371    /// Only deletes when `entity_count() > max_entities`.
1372    ///
1373    /// # Errors
1374    ///
1375    /// Returns an error if the database query fails.
1376    pub async fn cap_entities(&self, max_entities: usize) -> Result<usize, MemoryError> {
1377        let current = self.entity_count().await?;
1378        let max = i64::try_from(max_entities)?;
1379        if current <= max {
1380            return Ok(0);
1381        }
1382        let excess = current - max;
1383        let result = zeph_db::query(sql!(
1384            "DELETE FROM graph_entities
1385             WHERE id IN (
1386                 SELECT e.id
1387                 FROM graph_entities e
1388                 LEFT JOIN (
1389                     SELECT source_entity_id AS eid, COUNT(*) AS cnt
1390                     FROM graph_edges WHERE valid_to IS NULL GROUP BY source_entity_id
1391                     UNION ALL
1392                     SELECT target_entity_id AS eid, COUNT(*) AS cnt
1393                     FROM graph_edges WHERE valid_to IS NULL GROUP BY target_entity_id
1394                 ) edge_counts ON e.id = edge_counts.eid
1395                 ORDER BY COALESCE(edge_counts.cnt, 0) ASC, e.last_seen_at ASC
1396                 LIMIT ?
1397             )"
1398        ))
1399        .bind(excess)
1400        .execute(&self.pool)
1401        .await?;
1402        Ok(usize::try_from(result.rows_affected())?)
1403    }
1404
1405    // ── Temporal Edge Queries ─────────────────────────────────────────────────
1406
1407    /// Return all edges for `entity_id` (as source or target) that were valid at `timestamp`.
1408    ///
1409    /// An edge is valid at `timestamp` when:
1410    /// - `valid_from <= timestamp`, AND
1411    /// - `valid_to IS NULL` (open-ended) OR `valid_to > timestamp`.
1412    ///
1413    /// `timestamp` must be a `SQLite` datetime string: `"YYYY-MM-DD HH:MM:SS"`.
1414    ///
1415    /// # Errors
1416    ///
1417    /// Returns an error if the database query fails.
1418    pub async fn edges_at_timestamp(
1419        &self,
1420        entity_id: i64,
1421        timestamp: &str,
1422    ) -> Result<Vec<Edge>, MemoryError> {
1423        // Split into two UNIONed branches to leverage the partial indexes from migration 030:
1424        //   Branch 1 (active edges):     idx_graph_edges_valid + idx_graph_edges_source/target
1425        //   Branch 2 (historical edges): idx_graph_edges_src_temporal / idx_graph_edges_tgt_temporal
1426        let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
1427            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1428                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1429                    edge_type, retrieval_count, last_retrieved_at
1430             FROM graph_edges
1431             WHERE valid_to IS NULL
1432               AND valid_from <= ?
1433               AND (source_entity_id = ? OR target_entity_id = ?)
1434             UNION ALL
1435             SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1436                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1437                    edge_type, retrieval_count, last_retrieved_at
1438             FROM graph_edges
1439             WHERE valid_to IS NOT NULL
1440               AND valid_from <= ?
1441               AND valid_to > ?
1442               AND (source_entity_id = ? OR target_entity_id = ?)"
1443        ))
1444        .bind(timestamp)
1445        .bind(entity_id)
1446        .bind(entity_id)
1447        .bind(timestamp)
1448        .bind(timestamp)
1449        .bind(entity_id)
1450        .bind(entity_id)
1451        .fetch_all(&self.pool)
1452        .await?;
1453        Ok(rows.into_iter().map(edge_from_row).collect())
1454    }
1455
1456    /// Return all edge versions (active and expired) for the given `(source, predicate)` pair.
1457    ///
1458    /// The optional `relation` filter restricts results to a specific relation label.
1459    /// Results are ordered by `valid_from DESC` (most recent first).
1460    ///
1461    /// # Errors
1462    ///
1463    /// Returns an error if the database query fails.
1464    pub async fn edge_history(
1465        &self,
1466        source_entity_id: i64,
1467        predicate: &str,
1468        relation: Option<&str>,
1469        limit: usize,
1470    ) -> Result<Vec<Edge>, MemoryError> {
1471        // Escape LIKE wildcards so `%` and `_` in the predicate are treated as literals.
1472        let escaped = predicate
1473            .replace('\\', "\\\\")
1474            .replace('%', "\\%")
1475            .replace('_', "\\_");
1476        let like_pattern = format!("%{escaped}%");
1477        let limit = i64::try_from(limit)?;
1478        let rows: Vec<EdgeRow> = if let Some(rel) = relation {
1479            zeph_db::query_as(sql!(
1480                "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1481                        valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1482                        edge_type, retrieval_count, last_retrieved_at
1483                 FROM graph_edges
1484                 WHERE source_entity_id = ?
1485                   AND fact LIKE ? ESCAPE '\\'
1486                   AND relation = ?
1487                 ORDER BY valid_from DESC
1488                 LIMIT ?"
1489            ))
1490            .bind(source_entity_id)
1491            .bind(&like_pattern)
1492            .bind(rel)
1493            .bind(limit)
1494            .fetch_all(&self.pool)
1495            .await?
1496        } else {
1497            zeph_db::query_as(sql!(
1498                "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1499                        valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1500                        edge_type, retrieval_count, last_retrieved_at
1501                 FROM graph_edges
1502                 WHERE source_entity_id = ?
1503                   AND fact LIKE ? ESCAPE '\\'
1504                 ORDER BY valid_from DESC
1505                 LIMIT ?"
1506            ))
1507            .bind(source_entity_id)
1508            .bind(&like_pattern)
1509            .bind(limit)
1510            .fetch_all(&self.pool)
1511            .await?
1512        };
1513        Ok(rows.into_iter().map(edge_from_row).collect())
1514    }
1515
1516    // ── BFS Traversal ─────────────────────────────────────────────────────────
1517
1518    /// Breadth-first traversal from `start_entity_id` up to `max_hops` hops.
1519    ///
1520    /// Returns all reachable entities and the active edges connecting them.
1521    /// Implements BFS iteratively in Rust to guarantee cycle safety regardless
1522    /// of `SQLite` CTE limitations.
1523    ///
1524    /// **`SQLite` bind parameter limit**: each BFS hop binds the frontier IDs three times in the
1525    /// neighbour query. At ~300+ frontier entities per hop, the IN clause may approach `SQLite`'s
1526    /// default `SQLITE_MAX_VARIABLE_NUMBER` limit of 999. Acceptable for Phase 1 (small graphs,
1527    /// `max_hops` typically 2–3). For large graphs, consider batching or a temp-table approach.
1528    ///
1529    /// # Errors
1530    ///
1531    /// Returns an error if any database query fails.
1532    pub async fn bfs(
1533        &self,
1534        start_entity_id: i64,
1535        max_hops: u32,
1536    ) -> Result<(Vec<Entity>, Vec<Edge>), MemoryError> {
1537        self.bfs_with_depth(start_entity_id, max_hops)
1538            .await
1539            .map(|(e, ed, _)| (e, ed))
1540    }
1541
1542    /// BFS traversal returning entities, edges, and a depth map (`entity_id` → hop distance).
1543    ///
1544    /// The depth map records the minimum hop distance from `start_entity_id` to each visited
1545    /// entity. The start entity itself has depth 0.
1546    ///
1547    /// **`SQLite` bind parameter limit**: see [`bfs`] for notes on frontier size limits.
1548    ///
1549    /// # Errors
1550    ///
1551    /// Returns an error if any database query fails.
1552    pub async fn bfs_with_depth(
1553        &self,
1554        start_entity_id: i64,
1555        max_hops: u32,
1556    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1557        self.bfs_core(start_entity_id, max_hops, None).await
1558    }
1559
1560    /// BFS traversal considering only edges that were valid at `timestamp`.
1561    ///
1562    /// Equivalent to [`bfs_with_depth`] but replaces the `valid_to IS NULL` filter with
1563    /// the temporal range predicate `valid_from <= ts AND (valid_to IS NULL OR valid_to > ts)`.
1564    ///
1565    /// `timestamp` must be a `SQLite` datetime string: `"YYYY-MM-DD HH:MM:SS"`.
1566    ///
1567    /// # Errors
1568    ///
1569    /// Returns an error if any database query fails.
1570    pub async fn bfs_at_timestamp(
1571        &self,
1572        start_entity_id: i64,
1573        max_hops: u32,
1574        timestamp: &str,
1575    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1576        self.bfs_core(start_entity_id, max_hops, Some(timestamp))
1577            .await
1578    }
1579
1580    /// BFS traversal scoped to specific MAGMA edge types.
1581    ///
1582    /// When `edge_types` is empty, behaves identically to [`bfs_with_depth`] (traverses all
1583    /// active edges). When `edge_types` is non-empty, only traverses edges whose `edge_type`
1584    /// matches one of the provided types.
1585    ///
1586    /// This enables subgraph-scoped retrieval: a causal query traverses only causal + semantic
1587    /// edges, a temporal query only temporal + semantic edges, etc.
1588    ///
1589    /// Note: Semantic is typically included in `edge_types` by the caller to ensure recall is
1590    /// never worse than the untyped BFS. See `classify_graph_subgraph` in `router.rs`.
1591    ///
1592    /// # Errors
1593    ///
1594    /// Returns an error if any database query fails.
1595    pub async fn bfs_typed(
1596        &self,
1597        start_entity_id: i64,
1598        max_hops: u32,
1599        edge_types: &[EdgeType],
1600    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1601        if edge_types.is_empty() {
1602            return self.bfs_with_depth(start_entity_id, max_hops).await;
1603        }
1604        self.bfs_core_typed(start_entity_id, max_hops, None, edge_types)
1605            .await
1606    }
1607
1608    /// Shared BFS implementation.
1609    ///
1610    /// When `at_timestamp` is `None`, only active edges (`valid_to IS NULL`) are traversed.
1611    /// When `at_timestamp` is `Some(ts)`, edges valid at `ts` are traversed (temporal BFS).
1612    ///
1613    /// All IDs used in dynamic SQL come from our own database — no user input reaches the
1614    /// format string, so there is no SQL injection risk.
1615    async fn bfs_core(
1616        &self,
1617        start_entity_id: i64,
1618        max_hops: u32,
1619        at_timestamp: Option<&str>,
1620    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1621        use std::collections::HashMap;
1622
1623        // SQLite binds frontier IDs 3× per hop; at >333 IDs the IN clause exceeds
1624        // SQLITE_MAX_VARIABLE_NUMBER (999). Cap to 300 to stay safely within the limit.
1625        const MAX_FRONTIER: usize = 300;
1626
1627        let mut depth_map: HashMap<i64, u32> = HashMap::new();
1628        let mut frontier: Vec<i64> = vec![start_entity_id];
1629        depth_map.insert(start_entity_id, 0);
1630
1631        for hop in 0..max_hops {
1632            if frontier.is_empty() {
1633                break;
1634            }
1635            frontier.truncate(MAX_FRONTIER);
1636            // IDs come from our own DB — no user input, no injection risk.
1637            // Three copies of frontier IDs: positions 1..n, n+1..2n, 2n+1..3n.
1638            // Timestamp (if any) follows at position 3n+1.
1639            let n = frontier.len();
1640            let ph1 = placeholder_list(1, n);
1641            let ph2 = placeholder_list(n + 1, n);
1642            let ph3 = placeholder_list(n * 2 + 1, n);
1643            let edge_filter = if at_timestamp.is_some() {
1644                let ts_pos = n * 3 + 1;
1645                format!(
1646                    "valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1647                    ts = numbered_placeholder(ts_pos),
1648                )
1649            } else {
1650                "valid_to IS NULL".to_owned()
1651            };
1652            let neighbour_sql = format!(
1653                "SELECT DISTINCT CASE
1654                     WHEN source_entity_id IN ({ph1}) THEN target_entity_id
1655                     ELSE source_entity_id
1656                 END as neighbour_id
1657                 FROM graph_edges
1658                 WHERE {edge_filter}
1659                   AND (source_entity_id IN ({ph2}) OR target_entity_id IN ({ph3}))"
1660            );
1661            let mut q = zeph_db::query_scalar::<_, i64>(&neighbour_sql);
1662            for id in &frontier {
1663                q = q.bind(*id);
1664            }
1665            for id in &frontier {
1666                q = q.bind(*id);
1667            }
1668            for id in &frontier {
1669                q = q.bind(*id);
1670            }
1671            if let Some(ts) = at_timestamp {
1672                q = q.bind(ts);
1673            }
1674            let neighbours: Vec<i64> = q.fetch_all(&self.pool).await?;
1675            let mut next_frontier: Vec<i64> = Vec::new();
1676            for nbr in neighbours {
1677                if let std::collections::hash_map::Entry::Vacant(e) = depth_map.entry(nbr) {
1678                    e.insert(hop + 1);
1679                    next_frontier.push(nbr);
1680                }
1681            }
1682            frontier = next_frontier;
1683        }
1684
1685        self.bfs_fetch_results(depth_map, at_timestamp).await
1686    }
1687
1688    /// BFS implementation scoped to specific edge types.
1689    ///
1690    /// Builds the IN clause for `edge_type` filtering dynamically from enum values.
1691    /// All enum-derived strings come from `EdgeType::as_str()` — no user input reaches SQL.
1692    ///
1693    /// # Errors
1694    ///
1695    /// Returns an error if any database query fails.
1696    async fn bfs_core_typed(
1697        &self,
1698        start_entity_id: i64,
1699        max_hops: u32,
1700        at_timestamp: Option<&str>,
1701        edge_types: &[EdgeType],
1702    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1703        use std::collections::HashMap;
1704
1705        const MAX_FRONTIER: usize = 300;
1706
1707        let type_strs: Vec<&str> = edge_types.iter().map(|t| t.as_str()).collect();
1708
1709        let mut depth_map: HashMap<i64, u32> = HashMap::new();
1710        let mut frontier: Vec<i64> = vec![start_entity_id];
1711        depth_map.insert(start_entity_id, 0);
1712
1713        let n_types = type_strs.len();
1714        // type_in is constant for the entire BFS — positions 1..=n_types never change.
1715        let type_in = placeholder_list(1, n_types);
1716        let id_start = n_types + 1;
1717
1718        for hop in 0..max_hops {
1719            if frontier.is_empty() {
1720                break;
1721            }
1722            frontier.truncate(MAX_FRONTIER);
1723
1724            let n_frontier = frontier.len();
1725            // Positions: types first (1..n_types), then 3 copies of frontier IDs.
1726            let fp1 = placeholder_list(id_start, n_frontier);
1727            let fp2 = placeholder_list(id_start + n_frontier, n_frontier);
1728            let fp3 = placeholder_list(id_start + n_frontier * 2, n_frontier);
1729
1730            let edge_filter = if at_timestamp.is_some() {
1731                let ts_pos = id_start + n_frontier * 3;
1732                format!(
1733                    "edge_type IN ({type_in}) AND valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1734                    ts = numbered_placeholder(ts_pos),
1735                )
1736            } else {
1737                format!("edge_type IN ({type_in}) AND valid_to IS NULL")
1738            };
1739
1740            let neighbour_sql = format!(
1741                "SELECT DISTINCT CASE
1742                     WHEN source_entity_id IN ({fp1}) THEN target_entity_id
1743                     ELSE source_entity_id
1744                 END as neighbour_id
1745                 FROM graph_edges
1746                 WHERE {edge_filter}
1747                   AND (source_entity_id IN ({fp2}) OR target_entity_id IN ({fp3}))"
1748            );
1749
1750            let mut q = zeph_db::query_scalar::<_, i64>(&neighbour_sql);
1751            // Bind types first
1752            for t in &type_strs {
1753                q = q.bind(*t);
1754            }
1755            // Bind frontier 3 times
1756            for id in &frontier {
1757                q = q.bind(*id);
1758            }
1759            for id in &frontier {
1760                q = q.bind(*id);
1761            }
1762            for id in &frontier {
1763                q = q.bind(*id);
1764            }
1765            if let Some(ts) = at_timestamp {
1766                q = q.bind(ts);
1767            }
1768
1769            let neighbours: Vec<i64> = q.fetch_all(&self.pool).await?;
1770            let mut next_frontier: Vec<i64> = Vec::new();
1771            for nbr in neighbours {
1772                if let std::collections::hash_map::Entry::Vacant(e) = depth_map.entry(nbr) {
1773                    e.insert(hop + 1);
1774                    next_frontier.push(nbr);
1775                }
1776            }
1777            frontier = next_frontier;
1778        }
1779
1780        // Fetch results — pass edge_type filter to bfs_fetch_results_typed
1781        self.bfs_fetch_results_typed(depth_map, at_timestamp, &type_strs)
1782            .await
1783    }
1784
1785    /// Fetch entities and typed edges for a completed BFS depth map.
1786    ///
1787    /// Filters returned edges by the provided `edge_type` strings.
1788    ///
1789    /// # Errors
1790    ///
1791    /// Returns an error if any database query fails.
1792    async fn bfs_fetch_results_typed(
1793        &self,
1794        depth_map: std::collections::HashMap<i64, u32>,
1795        at_timestamp: Option<&str>,
1796        type_strs: &[&str],
1797    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1798        let mut visited_ids: Vec<i64> = depth_map.keys().copied().collect();
1799        if visited_ids.is_empty() {
1800            return Ok((Vec::new(), Vec::new(), depth_map));
1801        }
1802        if visited_ids.len() > 499 {
1803            tracing::warn!(
1804                total = visited_ids.len(),
1805                retained = 499,
1806                "bfs_fetch_results_typed: visited entity set truncated to 499"
1807            );
1808            visited_ids.truncate(499);
1809        }
1810
1811        let n_types = type_strs.len();
1812        let n_visited = visited_ids.len();
1813
1814        // Bind order: types first (1..=n_types), then visited_ids twice, then optional timestamp.
1815        let type_in = placeholder_list(1, n_types);
1816        let id_start = n_types + 1;
1817        let ph_ids1 = placeholder_list(id_start, n_visited);
1818        let ph_ids2 = placeholder_list(id_start + n_visited, n_visited);
1819
1820        let edge_filter = if at_timestamp.is_some() {
1821            let ts_pos = id_start + n_visited * 2;
1822            format!(
1823                "edge_type IN ({type_in}) AND valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1824                ts = numbered_placeholder(ts_pos),
1825            )
1826        } else {
1827            format!("edge_type IN ({type_in}) AND valid_to IS NULL")
1828        };
1829
1830        let edge_sql = format!(
1831            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1832                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1833                    edge_type, retrieval_count, last_retrieved_at
1834             FROM graph_edges
1835             WHERE {edge_filter}
1836               AND source_entity_id IN ({ph_ids1})
1837               AND target_entity_id IN ({ph_ids2})"
1838        );
1839        let mut edge_query = zeph_db::query_as::<_, EdgeRow>(&edge_sql);
1840        for t in type_strs {
1841            edge_query = edge_query.bind(*t);
1842        }
1843        for id in &visited_ids {
1844            edge_query = edge_query.bind(*id);
1845        }
1846        for id in &visited_ids {
1847            edge_query = edge_query.bind(*id);
1848        }
1849        if let Some(ts) = at_timestamp {
1850            edge_query = edge_query.bind(ts);
1851        }
1852        let edge_rows: Vec<EdgeRow> = edge_query.fetch_all(&self.pool).await?;
1853
1854        // For entity query, use plain sequential bind positions (no type prefix offset)
1855        let entity_sql2 = format!(
1856            "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
1857             FROM graph_entities WHERE id IN ({ph})",
1858            ph = placeholder_list(1, visited_ids.len()),
1859        );
1860        let mut entity_query = zeph_db::query_as::<_, EntityRow>(&entity_sql2);
1861        for id in &visited_ids {
1862            entity_query = entity_query.bind(*id);
1863        }
1864        let entity_rows: Vec<EntityRow> = entity_query.fetch_all(&self.pool).await?;
1865
1866        let entities: Vec<Entity> = entity_rows
1867            .into_iter()
1868            .map(entity_from_row)
1869            .collect::<Result<Vec<_>, _>>()?;
1870        let edges: Vec<Edge> = edge_rows.into_iter().map(edge_from_row).collect();
1871
1872        Ok((entities, edges, depth_map))
1873    }
1874
1875    /// Fetch entities and edges for a completed BFS depth map.
1876    async fn bfs_fetch_results(
1877        &self,
1878        depth_map: std::collections::HashMap<i64, u32>,
1879        at_timestamp: Option<&str>,
1880    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1881        let mut visited_ids: Vec<i64> = depth_map.keys().copied().collect();
1882        if visited_ids.is_empty() {
1883            return Ok((Vec::new(), Vec::new(), depth_map));
1884        }
1885        // Edge query binds visited_ids twice — cap at 499 to stay under SQLite 999 limit.
1886        if visited_ids.len() > 499 {
1887            tracing::warn!(
1888                total = visited_ids.len(),
1889                retained = 499,
1890                "bfs_fetch_results: visited entity set truncated to 499 to stay within SQLite bind limit; \
1891                 some reachable entities will be dropped from results"
1892            );
1893            visited_ids.truncate(499);
1894        }
1895
1896        let n = visited_ids.len();
1897        let ph_ids1 = placeholder_list(1, n);
1898        let ph_ids2 = placeholder_list(n + 1, n);
1899        let edge_filter = if at_timestamp.is_some() {
1900            let ts_pos = n * 2 + 1;
1901            format!(
1902                "valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1903                ts = numbered_placeholder(ts_pos),
1904            )
1905        } else {
1906            "valid_to IS NULL".to_owned()
1907        };
1908        let edge_sql = format!(
1909            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1910                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1911                    edge_type, retrieval_count, last_retrieved_at
1912             FROM graph_edges
1913             WHERE {edge_filter}
1914               AND source_entity_id IN ({ph_ids1})
1915               AND target_entity_id IN ({ph_ids2})"
1916        );
1917        let mut edge_query = zeph_db::query_as::<_, EdgeRow>(&edge_sql);
1918        for id in &visited_ids {
1919            edge_query = edge_query.bind(*id);
1920        }
1921        for id in &visited_ids {
1922            edge_query = edge_query.bind(*id);
1923        }
1924        if let Some(ts) = at_timestamp {
1925            edge_query = edge_query.bind(ts);
1926        }
1927        let edge_rows: Vec<EdgeRow> = edge_query.fetch_all(&self.pool).await?;
1928
1929        let entity_sql = format!(
1930            "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
1931             FROM graph_entities WHERE id IN ({ph})",
1932            ph = placeholder_list(1, n),
1933        );
1934        let mut entity_query = zeph_db::query_as::<_, EntityRow>(&entity_sql);
1935        for id in &visited_ids {
1936            entity_query = entity_query.bind(*id);
1937        }
1938        let entity_rows: Vec<EntityRow> = entity_query.fetch_all(&self.pool).await?;
1939
1940        let entities: Vec<Entity> = entity_rows
1941            .into_iter()
1942            .map(entity_from_row)
1943            .collect::<Result<Vec<_>, _>>()?;
1944        let edges: Vec<Edge> = edge_rows.into_iter().map(edge_from_row).collect();
1945
1946        Ok((entities, edges, depth_map))
1947    }
1948
1949    // ── Backfill helpers ──────────────────────────────────────────────────────
1950
1951    /// Find an entity by name only (no type filter).
1952    ///
1953    /// Uses a two-phase lookup to ensure exact name matches are always prioritised:
1954    /// 1. Exact case-insensitive match on `name` or `canonical_name`.
1955    /// 2. If no exact match found, falls back to FTS5 prefix search (see `find_entities_fuzzy`).
1956    ///
1957    /// This prevents FTS5 from returning a different entity whose *summary* mentions the
1958    /// searched name (e.g. searching "Alice" returning "Google" because Google's summary
1959    /// contains "Alice").
1960    ///
1961    /// # Errors
1962    ///
1963    /// Returns an error if the database query fails.
1964    pub async fn find_entity_by_name(&self, name: &str) -> Result<Vec<Entity>, MemoryError> {
1965        let find_by_name_sql = format!(
1966            "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id \
1967             FROM graph_entities \
1968             WHERE name = ? {cn} OR canonical_name = ? {cn} \
1969             LIMIT 5",
1970            cn = <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
1971        );
1972        let rows: Vec<EntityRow> = zeph_db::query_as(&find_by_name_sql)
1973            .bind(name)
1974            .bind(name)
1975            .fetch_all(&self.pool)
1976            .await?;
1977
1978        if !rows.is_empty() {
1979            return rows.into_iter().map(entity_from_row).collect();
1980        }
1981
1982        self.find_entities_fuzzy(name, 5).await
1983    }
1984
1985    /// Return up to `limit` messages that have not yet been processed by graph extraction.
1986    ///
1987    /// Reads the `graph_processed` column added by migration 021.
1988    ///
1989    /// # Errors
1990    ///
1991    /// Returns an error if the database query fails.
1992    pub async fn unprocessed_messages_for_backfill(
1993        &self,
1994        limit: usize,
1995    ) -> Result<Vec<(crate::types::MessageId, String)>, MemoryError> {
1996        let limit = i64::try_from(limit)?;
1997        let rows: Vec<(i64, String)> = zeph_db::query_as(sql!(
1998            "SELECT id, content FROM messages
1999             WHERE graph_processed = 0
2000             ORDER BY id ASC
2001             LIMIT ?"
2002        ))
2003        .bind(limit)
2004        .fetch_all(&self.pool)
2005        .await?;
2006        Ok(rows
2007            .into_iter()
2008            .map(|(id, content)| (crate::types::MessageId(id), content))
2009            .collect())
2010    }
2011
2012    /// Return the count of messages not yet processed by graph extraction.
2013    ///
2014    /// # Errors
2015    ///
2016    /// Returns an error if the database query fails.
2017    pub async fn unprocessed_message_count(&self) -> Result<i64, MemoryError> {
2018        let count: i64 = zeph_db::query_scalar(sql!(
2019            "SELECT COUNT(*) FROM messages WHERE graph_processed = 0"
2020        ))
2021        .fetch_one(&self.pool)
2022        .await?;
2023        Ok(count)
2024    }
2025
2026    /// Mark a batch of messages as graph-processed.
2027    ///
2028    /// # Errors
2029    ///
2030    /// Returns an error if the database query fails.
2031    pub async fn mark_messages_graph_processed(
2032        &self,
2033        ids: &[crate::types::MessageId],
2034    ) -> Result<(), MemoryError> {
2035        const MAX_BATCH: usize = 490;
2036        if ids.is_empty() {
2037            return Ok(());
2038        }
2039        for chunk in ids.chunks(MAX_BATCH) {
2040            let placeholders = placeholder_list(1, chunk.len());
2041            let sql =
2042                format!("UPDATE messages SET graph_processed = 1 WHERE id IN ({placeholders})");
2043            let mut query = zeph_db::query(&sql);
2044            for id in chunk {
2045                query = query.bind(id.0);
2046            }
2047            query.execute(&self.pool).await?;
2048        }
2049        Ok(())
2050    }
2051}
2052
2053// ── Dialect helpers ───────────────────────────────────────────────────────────
2054
2055#[cfg(feature = "sqlite")]
2056fn community_ids_sql(placeholders: &str) -> String {
2057    format!(
2058        "SELECT CAST(j.value AS INTEGER) AS entity_id, c.id AS community_id
2059         FROM graph_communities c, json_each(c.entity_ids) j
2060         WHERE CAST(j.value AS INTEGER) IN ({placeholders})"
2061    )
2062}
2063
2064#[cfg(feature = "postgres")]
2065fn community_ids_sql(placeholders: &str) -> String {
2066    format!(
2067        "SELECT (j.value)::bigint AS entity_id, c.id AS community_id
2068         FROM graph_communities c,
2069              jsonb_array_elements_text(c.entity_ids::jsonb) j(value)
2070         WHERE (j.value)::bigint IN ({placeholders})"
2071    )
2072}
2073
2074// ── Row types for zeph_db::query_as ─────────────────────────────────────────────
2075
2076#[derive(zeph_db::FromRow)]
2077struct EntityRow {
2078    id: i64,
2079    name: String,
2080    canonical_name: String,
2081    entity_type: String,
2082    summary: Option<String>,
2083    first_seen_at: String,
2084    last_seen_at: String,
2085    qdrant_point_id: Option<String>,
2086}
2087
2088fn entity_from_row(row: EntityRow) -> Result<Entity, MemoryError> {
2089    let entity_type = row
2090        .entity_type
2091        .parse::<EntityType>()
2092        .map_err(MemoryError::GraphStore)?;
2093    Ok(Entity {
2094        id: row.id,
2095        name: row.name,
2096        canonical_name: row.canonical_name,
2097        entity_type,
2098        summary: row.summary,
2099        first_seen_at: row.first_seen_at,
2100        last_seen_at: row.last_seen_at,
2101        qdrant_point_id: row.qdrant_point_id,
2102    })
2103}
2104
2105#[derive(zeph_db::FromRow)]
2106struct AliasRow {
2107    id: i64,
2108    entity_id: i64,
2109    alias_name: String,
2110    created_at: String,
2111}
2112
2113fn alias_from_row(row: AliasRow) -> EntityAlias {
2114    EntityAlias {
2115        id: row.id,
2116        entity_id: row.entity_id,
2117        alias_name: row.alias_name,
2118        created_at: row.created_at,
2119    }
2120}
2121
2122#[derive(zeph_db::FromRow)]
2123struct EdgeRow {
2124    id: i64,
2125    source_entity_id: i64,
2126    target_entity_id: i64,
2127    relation: String,
2128    fact: String,
2129    confidence: f64,
2130    valid_from: String,
2131    valid_to: Option<String>,
2132    created_at: String,
2133    expired_at: Option<String>,
2134    episode_id: Option<i64>,
2135    qdrant_point_id: Option<String>,
2136    edge_type: String,
2137    retrieval_count: i32,
2138    last_retrieved_at: Option<i64>,
2139}
2140
2141fn edge_from_row(row: EdgeRow) -> Edge {
2142    let edge_type = row
2143        .edge_type
2144        .parse::<EdgeType>()
2145        .unwrap_or(EdgeType::Semantic);
2146    Edge {
2147        id: row.id,
2148        source_entity_id: row.source_entity_id,
2149        target_entity_id: row.target_entity_id,
2150        relation: row.relation,
2151        fact: row.fact,
2152        #[allow(clippy::cast_possible_truncation)]
2153        confidence: row.confidence as f32,
2154        valid_from: row.valid_from,
2155        valid_to: row.valid_to,
2156        created_at: row.created_at,
2157        expired_at: row.expired_at,
2158        episode_id: row.episode_id.map(MessageId),
2159        qdrant_point_id: row.qdrant_point_id,
2160        edge_type,
2161        retrieval_count: row.retrieval_count,
2162        last_retrieved_at: row.last_retrieved_at,
2163    }
2164}
2165
2166#[derive(zeph_db::FromRow)]
2167struct CommunityRow {
2168    id: i64,
2169    name: String,
2170    summary: String,
2171    entity_ids: String,
2172    fingerprint: Option<String>,
2173    created_at: String,
2174    updated_at: String,
2175}
2176
2177// ── Tests ─────────────────────────────────────────────────────────────────────
2178
2179#[cfg(test)]
2180mod tests;