Skip to main content

zeph_memory/sqlite/messages/
mod.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use zeph_llm::provider::{Message, MessageMetadata, MessagePart, Role};
5
6use super::SqliteStore;
7use crate::error::MemoryError;
8use crate::types::{ConversationId, MessageId};
9
10/// Sanitize an arbitrary string into a valid FTS5 query.
11///
12/// Splits on non-alphanumeric characters, filters empty tokens, and joins
13/// with spaces. This strips FTS5 special characters (`"`, `*`, `(`, `)`,
14/// `^`, `-`, `+`, `:`) to prevent syntax errors in `MATCH` clauses.
15///
16/// Note: FTS5 boolean operators (AND, OR, NOT, NEAR) are preserved in their
17/// original case. Callers that need to prevent operator interpretation must
18/// filter these tokens separately (see `find_entities_fuzzy` in `graph/store.rs`).
19pub(crate) fn sanitize_fts5_query(query: &str) -> String {
20    query
21        .split(|c: char| !c.is_alphanumeric())
22        .filter(|t| !t.is_empty())
23        .collect::<Vec<_>>()
24        .join(" ")
25}
26
27fn parse_role(s: &str) -> Role {
28    match s {
29        "assistant" => Role::Assistant,
30        "system" => Role::System,
31        _ => Role::User,
32    }
33}
34
35#[must_use]
36pub fn role_str(role: Role) -> &'static str {
37    match role {
38        Role::System => "system",
39        Role::User => "user",
40        Role::Assistant => "assistant",
41    }
42}
43
44/// Deserialize message parts from a stored JSON string.
45///
46/// Returns an empty `Vec` and logs a warning if deserialization fails, including the role and
47/// a truncated excerpt of the malformed JSON for diagnostics.
48fn parse_parts_json(role_str: &str, parts_json: &str) -> Vec<MessagePart> {
49    if parts_json == "[]" {
50        return vec![];
51    }
52    match serde_json::from_str(parts_json) {
53        Ok(p) => p,
54        Err(e) => {
55            let truncated = parts_json.chars().take(120).collect::<String>();
56            tracing::warn!(
57                role = %role_str,
58                parts_json = %truncated,
59                error = %e,
60                "failed to deserialize message parts, falling back to empty"
61            );
62            vec![]
63        }
64    }
65}
66
67impl SqliteStore {
68    /// Create a new conversation and return its ID.
69    ///
70    /// # Errors
71    ///
72    /// Returns an error if the insert fails.
73    pub async fn create_conversation(&self) -> Result<ConversationId, MemoryError> {
74        let row: (ConversationId,) =
75            sqlx::query_as("INSERT INTO conversations DEFAULT VALUES RETURNING id")
76                .fetch_one(&self.pool)
77                .await?;
78        Ok(row.0)
79    }
80
81    /// Save a message to the given conversation and return the message ID.
82    ///
83    /// # Errors
84    ///
85    /// Returns an error if the insert fails.
86    pub async fn save_message(
87        &self,
88        conversation_id: ConversationId,
89        role: &str,
90        content: &str,
91    ) -> Result<MessageId, MemoryError> {
92        self.save_message_with_parts(conversation_id, role, content, "[]")
93            .await
94    }
95
96    /// Save a message with structured parts JSON.
97    ///
98    /// # Errors
99    ///
100    /// Returns an error if the insert fails.
101    pub async fn save_message_with_parts(
102        &self,
103        conversation_id: ConversationId,
104        role: &str,
105        content: &str,
106        parts_json: &str,
107    ) -> Result<MessageId, MemoryError> {
108        self.save_message_with_metadata(conversation_id, role, content, parts_json, true, true)
109            .await
110    }
111
112    /// Save a message with visibility metadata.
113    ///
114    /// # Errors
115    ///
116    /// Returns an error if the insert fails.
117    pub async fn save_message_with_metadata(
118        &self,
119        conversation_id: ConversationId,
120        role: &str,
121        content: &str,
122        parts_json: &str,
123        agent_visible: bool,
124        user_visible: bool,
125    ) -> Result<MessageId, MemoryError> {
126        let importance_score = crate::semantic::importance::compute_importance(content, role);
127        let row: (MessageId,) = sqlx::query_as(
128            "INSERT INTO messages (conversation_id, role, content, parts, agent_visible, user_visible, importance_score) \
129             VALUES (?, ?, ?, ?, ?, ?, ?) RETURNING id",
130        )
131        .bind(conversation_id)
132        .bind(role)
133        .bind(content)
134        .bind(parts_json)
135        .bind(i64::from(agent_visible))
136        .bind(i64::from(user_visible))
137        .bind(importance_score)
138        .fetch_one(&self.pool)
139        .await?;
140        Ok(row.0)
141    }
142
143    /// Load the most recent messages for a conversation, up to `limit`.
144    ///
145    /// # Errors
146    ///
147    /// Returns an error if the query fails.
148    pub async fn load_history(
149        &self,
150        conversation_id: ConversationId,
151        limit: u32,
152    ) -> Result<Vec<Message>, MemoryError> {
153        let rows: Vec<(String, String, String, i64, i64, i64)> = sqlx::query_as(
154            "SELECT role, content, parts, agent_visible, user_visible, id FROM (\
155                SELECT role, content, parts, agent_visible, user_visible, id FROM messages \
156                WHERE conversation_id = ? AND deleted_at IS NULL \
157                ORDER BY id DESC \
158                LIMIT ?\
159             ) ORDER BY id ASC",
160        )
161        .bind(conversation_id)
162        .bind(limit)
163        .fetch_all(&self.pool)
164        .await?;
165
166        let messages = rows
167            .into_iter()
168            .map(
169                |(role_str, content, parts_json, agent_visible, user_visible, row_id)| {
170                    let parts = parse_parts_json(&role_str, &parts_json);
171                    Message {
172                        role: parse_role(&role_str),
173                        content,
174                        parts,
175                        metadata: MessageMetadata {
176                            agent_visible: agent_visible != 0,
177                            user_visible: user_visible != 0,
178                            compacted_at: None,
179                            deferred_summary: None,
180                            focus_pinned: false,
181                            focus_marker_id: None,
182                            db_id: Some(row_id),
183                        },
184                    }
185                },
186            )
187            .collect();
188        Ok(messages)
189    }
190
191    /// Load messages filtered by visibility flags.
192    ///
193    /// Pass `Some(true)` to filter by a flag, `None` to skip filtering.
194    ///
195    /// # Errors
196    ///
197    /// Returns an error if the query fails.
198    pub async fn load_history_filtered(
199        &self,
200        conversation_id: ConversationId,
201        limit: u32,
202        agent_visible: Option<bool>,
203        user_visible: Option<bool>,
204    ) -> Result<Vec<Message>, MemoryError> {
205        let av = agent_visible.map(i64::from);
206        let uv = user_visible.map(i64::from);
207
208        let rows: Vec<(String, String, String, i64, i64, i64)> = sqlx::query_as(
209            "WITH recent AS (\
210                SELECT role, content, parts, agent_visible, user_visible, id FROM messages \
211                WHERE conversation_id = ? \
212                  AND deleted_at IS NULL \
213                  AND (? IS NULL OR agent_visible = ?) \
214                  AND (? IS NULL OR user_visible = ?) \
215                ORDER BY id DESC \
216                LIMIT ?\
217             ) SELECT role, content, parts, agent_visible, user_visible, id FROM recent ORDER BY id ASC",
218        )
219        .bind(conversation_id)
220        .bind(av)
221        .bind(av)
222        .bind(uv)
223        .bind(uv)
224        .bind(limit)
225        .fetch_all(&self.pool)
226        .await?;
227
228        let messages = rows
229            .into_iter()
230            .map(
231                |(role_str, content, parts_json, agent_visible, user_visible, row_id)| {
232                    let parts = parse_parts_json(&role_str, &parts_json);
233                    Message {
234                        role: parse_role(&role_str),
235                        content,
236                        parts,
237                        metadata: MessageMetadata {
238                            agent_visible: agent_visible != 0,
239                            user_visible: user_visible != 0,
240                            compacted_at: None,
241                            deferred_summary: None,
242                            focus_pinned: false,
243                            focus_marker_id: None,
244                            db_id: Some(row_id),
245                        },
246                    }
247                },
248            )
249            .collect();
250        Ok(messages)
251    }
252
253    /// Atomically mark a range of messages as user-only and insert a summary as agent-only.
254    ///
255    /// Within a single transaction:
256    /// 1. Updates `agent_visible=0, compacted_at=now` for messages in `compacted_range`.
257    /// 2. Inserts `summary_content` with `agent_visible=1, user_visible=0`.
258    ///
259    /// Returns the `MessageId` of the inserted summary.
260    ///
261    /// # Errors
262    ///
263    /// Returns an error if the transaction fails.
264    pub async fn replace_conversation(
265        &self,
266        conversation_id: ConversationId,
267        compacted_range: std::ops::RangeInclusive<MessageId>,
268        summary_role: &str,
269        summary_content: &str,
270    ) -> Result<MessageId, MemoryError> {
271        let now = {
272            let secs = std::time::SystemTime::now()
273                .duration_since(std::time::UNIX_EPOCH)
274                .unwrap_or_default()
275                .as_secs();
276            format!("{secs}")
277        };
278        let start_id = compacted_range.start().0;
279        let end_id = compacted_range.end().0;
280
281        let mut tx = self.pool.begin().await?;
282
283        sqlx::query(
284            "UPDATE messages SET agent_visible = 0, compacted_at = ? \
285             WHERE conversation_id = ? AND id >= ? AND id <= ?",
286        )
287        .bind(&now)
288        .bind(conversation_id)
289        .bind(start_id)
290        .bind(end_id)
291        .execute(&mut *tx)
292        .await?;
293
294        // importance_score uses schema DEFAULT 0.5 (neutral); compaction summaries are not scored at write time.
295        let row: (MessageId,) = sqlx::query_as(
296            "INSERT INTO messages \
297             (conversation_id, role, content, parts, agent_visible, user_visible) \
298             VALUES (?, ?, ?, '[]', 1, 0) RETURNING id",
299        )
300        .bind(conversation_id)
301        .bind(summary_role)
302        .bind(summary_content)
303        .fetch_one(&mut *tx)
304        .await?;
305
306        tx.commit().await?;
307
308        Ok(row.0)
309    }
310
311    /// Atomically hide `tool_use/tool_result` message pairs and insert summary messages.
312    ///
313    /// Within a single transaction:
314    /// 1. Sets `agent_visible=0, compacted_at=<now>` for each ID in `hide_ids`.
315    /// 2. Inserts each text in `summaries` as a new agent-only assistant message.
316    ///
317    /// # Errors
318    ///
319    /// Returns an error if the transaction fails.
320    pub async fn apply_tool_pair_summaries(
321        &self,
322        conversation_id: ConversationId,
323        hide_ids: &[i64],
324        summaries: &[String],
325    ) -> Result<(), MemoryError> {
326        if hide_ids.is_empty() && summaries.is_empty() {
327            return Ok(());
328        }
329
330        let now = std::time::SystemTime::now()
331            .duration_since(std::time::UNIX_EPOCH)
332            .unwrap_or_default()
333            .as_secs()
334            .to_string();
335
336        let mut tx = self.pool.begin().await?;
337
338        for &id in hide_ids {
339            sqlx::query("UPDATE messages SET agent_visible = 0, compacted_at = ? WHERE id = ?")
340                .bind(&now)
341                .bind(id)
342                .execute(&mut *tx)
343                .await?;
344        }
345
346        for summary in summaries {
347            let content = format!("[tool summary] {summary}");
348            let parts = serde_json::to_string(&[MessagePart::Summary {
349                text: summary.clone(),
350            }])
351            .unwrap_or_else(|_| "[]".to_string());
352            sqlx::query(
353                "INSERT INTO messages \
354                 (conversation_id, role, content, parts, agent_visible, user_visible) \
355                 VALUES (?, 'assistant', ?, ?, 1, 0)",
356            )
357            .bind(conversation_id)
358            .bind(&content)
359            .bind(&parts)
360            .execute(&mut *tx)
361            .await?;
362        }
363
364        tx.commit().await?;
365        Ok(())
366    }
367
368    /// Return the IDs of the N oldest messages in a conversation (ascending order).
369    ///
370    /// # Errors
371    ///
372    /// Returns an error if the query fails.
373    pub async fn oldest_message_ids(
374        &self,
375        conversation_id: ConversationId,
376        n: u32,
377    ) -> Result<Vec<MessageId>, MemoryError> {
378        let rows: Vec<(MessageId,)> = sqlx::query_as(
379            "SELECT id FROM messages WHERE conversation_id = ? AND deleted_at IS NULL ORDER BY id ASC LIMIT ?",
380        )
381        .bind(conversation_id)
382        .bind(n)
383        .fetch_all(&self.pool)
384        .await?;
385        Ok(rows.into_iter().map(|r| r.0).collect())
386    }
387
388    /// Return the ID of the most recent conversation, if any.
389    ///
390    /// # Errors
391    ///
392    /// Returns an error if the query fails.
393    pub async fn latest_conversation_id(&self) -> Result<Option<ConversationId>, MemoryError> {
394        let row: Option<(ConversationId,)> =
395            sqlx::query_as("SELECT id FROM conversations ORDER BY id DESC LIMIT 1")
396                .fetch_optional(&self.pool)
397                .await?;
398        Ok(row.map(|r| r.0))
399    }
400
401    /// Fetch a single message by its ID.
402    ///
403    /// # Errors
404    ///
405    /// Returns an error if the query fails.
406    pub async fn message_by_id(
407        &self,
408        message_id: MessageId,
409    ) -> Result<Option<Message>, MemoryError> {
410        let row: Option<(String, String, String, i64, i64)> = sqlx::query_as(
411            "SELECT role, content, parts, agent_visible, user_visible FROM messages WHERE id = ? AND deleted_at IS NULL",
412        )
413        .bind(message_id)
414        .fetch_optional(&self.pool)
415        .await?;
416
417        Ok(row.map(
418            |(role_str, content, parts_json, agent_visible, user_visible)| {
419                let parts = parse_parts_json(&role_str, &parts_json);
420                Message {
421                    role: parse_role(&role_str),
422                    content,
423                    parts,
424                    metadata: MessageMetadata {
425                        agent_visible: agent_visible != 0,
426                        user_visible: user_visible != 0,
427                        compacted_at: None,
428                        deferred_summary: None,
429                        focus_pinned: false,
430                        focus_marker_id: None,
431                        db_id: None,
432                    },
433                }
434            },
435        ))
436    }
437
438    /// Fetch messages by a list of IDs in a single query.
439    ///
440    /// # Errors
441    ///
442    /// Returns an error if the query fails.
443    pub async fn messages_by_ids(
444        &self,
445        ids: &[MessageId],
446    ) -> Result<Vec<(MessageId, Message)>, MemoryError> {
447        if ids.is_empty() {
448            return Ok(Vec::new());
449        }
450
451        let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
452
453        let query = format!(
454            "SELECT id, role, content, parts FROM messages \
455             WHERE id IN ({placeholders}) AND agent_visible = 1 AND deleted_at IS NULL"
456        );
457        let mut q = sqlx::query_as::<_, (MessageId, String, String, String)>(&query);
458        for &id in ids {
459            q = q.bind(id);
460        }
461
462        let rows = q.fetch_all(&self.pool).await?;
463
464        Ok(rows
465            .into_iter()
466            .map(|(id, role_str, content, parts_json)| {
467                let parts = parse_parts_json(&role_str, &parts_json);
468                (
469                    id,
470                    Message {
471                        role: parse_role(&role_str),
472                        content,
473                        parts,
474                        metadata: MessageMetadata::default(),
475                    },
476                )
477            })
478            .collect())
479    }
480
481    /// Return message IDs and content for messages without embeddings.
482    ///
483    /// # Errors
484    ///
485    /// Returns an error if the query fails.
486    pub async fn unembedded_message_ids(
487        &self,
488        limit: Option<usize>,
489    ) -> Result<Vec<(MessageId, ConversationId, String, String)>, MemoryError> {
490        let effective_limit = limit.map_or(i64::MAX, |l| i64::try_from(l).unwrap_or(i64::MAX));
491
492        let rows: Vec<(MessageId, ConversationId, String, String)> = sqlx::query_as(
493            "SELECT m.id, m.conversation_id, m.role, m.content \
494             FROM messages m \
495             LEFT JOIN embeddings_metadata em ON m.id = em.message_id \
496             WHERE em.id IS NULL AND m.deleted_at IS NULL \
497             ORDER BY m.id ASC \
498             LIMIT ?",
499        )
500        .bind(effective_limit)
501        .fetch_all(&self.pool)
502        .await?;
503
504        Ok(rows)
505    }
506
507    /// Count the number of messages in a conversation.
508    ///
509    /// # Errors
510    ///
511    /// Returns an error if the query fails.
512    pub async fn count_messages(
513        &self,
514        conversation_id: ConversationId,
515    ) -> Result<i64, MemoryError> {
516        let row: (i64,) = sqlx::query_as(
517            "SELECT COUNT(*) FROM messages WHERE conversation_id = ? AND deleted_at IS NULL",
518        )
519        .bind(conversation_id)
520        .fetch_one(&self.pool)
521        .await?;
522        Ok(row.0)
523    }
524
525    /// Count messages in a conversation with id greater than `after_id`.
526    ///
527    /// # Errors
528    ///
529    /// Returns an error if the query fails.
530    pub async fn count_messages_after(
531        &self,
532        conversation_id: ConversationId,
533        after_id: MessageId,
534    ) -> Result<i64, MemoryError> {
535        let row: (i64,) =
536            sqlx::query_as(
537                "SELECT COUNT(*) FROM messages WHERE conversation_id = ? AND id > ? AND deleted_at IS NULL",
538            )
539            .bind(conversation_id)
540            .bind(after_id)
541            .fetch_one(&self.pool)
542            .await?;
543        Ok(row.0)
544    }
545
546    /// Full-text keyword search over messages using FTS5.
547    ///
548    /// Returns message IDs with BM25 relevance scores (lower = more relevant,
549    /// negated to positive for consistency with vector scores).
550    ///
551    /// # Errors
552    ///
553    /// Returns an error if the query fails.
554    pub async fn keyword_search(
555        &self,
556        query: &str,
557        limit: usize,
558        conversation_id: Option<ConversationId>,
559    ) -> Result<Vec<(MessageId, f64)>, MemoryError> {
560        let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
561        let safe_query = sanitize_fts5_query(query);
562        if safe_query.is_empty() {
563            return Ok(Vec::new());
564        }
565
566        let rows: Vec<(MessageId, f64)> = if let Some(cid) = conversation_id {
567            sqlx::query_as(
568                "SELECT m.id, -rank AS score \
569                 FROM messages_fts f \
570                 JOIN messages m ON m.id = f.rowid \
571                 WHERE messages_fts MATCH ? AND m.conversation_id = ? AND m.agent_visible = 1 AND m.deleted_at IS NULL \
572                 ORDER BY rank \
573                 LIMIT ?",
574            )
575            .bind(&safe_query)
576            .bind(cid)
577            .bind(effective_limit)
578            .fetch_all(&self.pool)
579            .await?
580        } else {
581            sqlx::query_as(
582                "SELECT m.id, -rank AS score \
583                 FROM messages_fts f \
584                 JOIN messages m ON m.id = f.rowid \
585                 WHERE messages_fts MATCH ? AND m.agent_visible = 1 AND m.deleted_at IS NULL \
586                 ORDER BY rank \
587                 LIMIT ?",
588            )
589            .bind(&safe_query)
590            .bind(effective_limit)
591            .fetch_all(&self.pool)
592            .await?
593        };
594
595        Ok(rows)
596    }
597
598    /// Full-text keyword search over messages using FTS5, filtered by a `created_at` time range.
599    ///
600    /// Used by the `Episodic` recall path to combine keyword matching with temporal filtering.
601    /// Temporal keywords are stripped from `query` by the caller before this method is invoked
602    /// (see `strip_temporal_keywords`) to prevent BM25 score distortion.
603    ///
604    /// `after` and `before` are `SQLite` datetime strings in `YYYY-MM-DD HH:MM:SS` format (UTC).
605    /// `None` means "no bound" on that side.
606    ///
607    /// # Errors
608    ///
609    /// Returns an error if the query fails.
610    pub async fn keyword_search_with_time_range(
611        &self,
612        query: &str,
613        limit: usize,
614        conversation_id: Option<ConversationId>,
615        after: Option<&str>,
616        before: Option<&str>,
617    ) -> Result<Vec<(MessageId, f64)>, MemoryError> {
618        let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
619        let safe_query = sanitize_fts5_query(query);
620        if safe_query.is_empty() {
621            return Ok(Vec::new());
622        }
623
624        // Build time-range clauses dynamically. Both bounds are optional.
625        let after_clause = if after.is_some() {
626            " AND m.created_at > ?"
627        } else {
628            ""
629        };
630        let before_clause = if before.is_some() {
631            " AND m.created_at < ?"
632        } else {
633            ""
634        };
635        let conv_clause = if conversation_id.is_some() {
636            " AND m.conversation_id = ?"
637        } else {
638            ""
639        };
640
641        let sql = format!(
642            "SELECT m.id, -rank AS score \
643             FROM messages_fts f \
644             JOIN messages m ON m.id = f.rowid \
645             WHERE messages_fts MATCH ? AND m.agent_visible = 1 AND m.deleted_at IS NULL\
646             {after_clause}{before_clause}{conv_clause} \
647             ORDER BY rank \
648             LIMIT ?"
649        );
650
651        let mut q = sqlx::query_as::<_, (MessageId, f64)>(&sql).bind(&safe_query);
652        if let Some(a) = after {
653            q = q.bind(a);
654        }
655        if let Some(b) = before {
656            q = q.bind(b);
657        }
658        if let Some(cid) = conversation_id {
659            q = q.bind(cid);
660        }
661        q = q.bind(effective_limit);
662
663        Ok(q.fetch_all(&self.pool).await?)
664    }
665
666    /// Fetch creation timestamps (Unix epoch seconds) for the given message IDs.
667    ///
668    /// Messages without a `created_at` column fall back to 0.
669    ///
670    /// # Errors
671    ///
672    /// Returns an error if the query fails.
673    pub async fn message_timestamps(
674        &self,
675        ids: &[MessageId],
676    ) -> Result<std::collections::HashMap<MessageId, i64>, MemoryError> {
677        if ids.is_empty() {
678            return Ok(std::collections::HashMap::new());
679        }
680
681        let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
682        let query = format!(
683            "SELECT id, COALESCE(CAST(strftime('%s', created_at) AS INTEGER), 0) \
684             FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
685        );
686        let mut q = sqlx::query_as::<_, (MessageId, i64)>(&query);
687        for &id in ids {
688            q = q.bind(id);
689        }
690
691        let rows = q.fetch_all(&self.pool).await?;
692        Ok(rows.into_iter().collect())
693    }
694
695    /// Load a range of messages after a given message ID.
696    ///
697    /// # Errors
698    ///
699    /// Returns an error if the query fails.
700    pub async fn load_messages_range(
701        &self,
702        conversation_id: ConversationId,
703        after_message_id: MessageId,
704        limit: usize,
705    ) -> Result<Vec<(MessageId, String, String)>, MemoryError> {
706        let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
707
708        let rows: Vec<(MessageId, String, String)> = sqlx::query_as(
709            "SELECT id, role, content FROM messages \
710             WHERE conversation_id = ? AND id > ? AND deleted_at IS NULL \
711             ORDER BY id ASC LIMIT ?",
712        )
713        .bind(conversation_id)
714        .bind(after_message_id)
715        .bind(effective_limit)
716        .fetch_all(&self.pool)
717        .await?;
718
719        Ok(rows)
720    }
721
722    // ── Eviction helpers ──────────────────────────────────────────────────────
723
724    /// Return all non-deleted message IDs with their eviction metadata.
725    ///
726    /// # Errors
727    ///
728    /// Returns an error if the query fails.
729    pub async fn get_eviction_candidates(
730        &self,
731    ) -> Result<Vec<crate::eviction::EvictionEntry>, crate::error::MemoryError> {
732        let rows: Vec<(MessageId, String, Option<String>, i64)> = sqlx::query_as(
733            "SELECT id, created_at, last_accessed, access_count \
734             FROM messages WHERE deleted_at IS NULL",
735        )
736        .fetch_all(&self.pool)
737        .await?;
738
739        Ok(rows
740            .into_iter()
741            .map(
742                |(id, created_at, last_accessed, access_count)| crate::eviction::EvictionEntry {
743                    id,
744                    created_at,
745                    last_accessed,
746                    access_count: access_count.try_into().unwrap_or(0),
747                },
748            )
749            .collect())
750    }
751
752    /// Soft-delete a set of messages by marking `deleted_at`.
753    ///
754    /// Soft-deleted messages are excluded from all history queries.
755    ///
756    /// # Errors
757    ///
758    /// Returns an error if the update fails.
759    pub async fn soft_delete_messages(
760        &self,
761        ids: &[MessageId],
762    ) -> Result<(), crate::error::MemoryError> {
763        if ids.is_empty() {
764            return Ok(());
765        }
766        // SQLite does not support array binding natively. Batch via individual updates.
767        for &id in ids {
768            sqlx::query(
769                "UPDATE messages SET deleted_at = datetime('now') WHERE id = ? AND deleted_at IS NULL",
770            )
771            .bind(id)
772            .execute(&self.pool)
773            .await?;
774        }
775        Ok(())
776    }
777
778    /// Return IDs of soft-deleted messages that have not yet been cleaned from Qdrant.
779    ///
780    /// # Errors
781    ///
782    /// Returns an error if the query fails.
783    pub async fn get_soft_deleted_message_ids(
784        &self,
785    ) -> Result<Vec<MessageId>, crate::error::MemoryError> {
786        let rows: Vec<(MessageId,)> = sqlx::query_as(
787            "SELECT id FROM messages WHERE deleted_at IS NOT NULL AND qdrant_cleaned = 0",
788        )
789        .fetch_all(&self.pool)
790        .await?;
791        Ok(rows.into_iter().map(|(id,)| id).collect())
792    }
793
794    /// Mark a set of soft-deleted messages as Qdrant-cleaned.
795    ///
796    /// # Errors
797    ///
798    /// Returns an error if the update fails.
799    pub async fn mark_qdrant_cleaned(
800        &self,
801        ids: &[MessageId],
802    ) -> Result<(), crate::error::MemoryError> {
803        for &id in ids {
804            sqlx::query("UPDATE messages SET qdrant_cleaned = 1 WHERE id = ?")
805                .bind(id)
806                .execute(&self.pool)
807                .await?;
808        }
809        Ok(())
810    }
811
812    /// Fetch `importance_score` values for the given message IDs.
813    ///
814    /// Messages missing from the table fall back to 0.5 (neutral) and are omitted from the map.
815    ///
816    /// # Errors
817    ///
818    /// Returns an error if the query fails.
819    pub async fn fetch_importance_scores(
820        &self,
821        ids: &[MessageId],
822    ) -> Result<std::collections::HashMap<MessageId, f64>, MemoryError> {
823        if ids.is_empty() {
824            return Ok(std::collections::HashMap::new());
825        }
826        let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
827        let query = format!(
828            "SELECT id, importance_score FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
829        );
830        let mut q = sqlx::query_as::<_, (MessageId, f64)>(&query);
831        for &id in ids {
832            q = q.bind(id);
833        }
834        let rows = q.fetch_all(&self.pool).await?;
835        Ok(rows.into_iter().collect())
836    }
837
838    /// Increment `access_count` and set `last_accessed = datetime('now')` for the given IDs.
839    ///
840    /// Skips the update when `ids` is empty.
841    ///
842    /// # Errors
843    ///
844    /// Returns an error if the update fails.
845    pub async fn increment_access_counts(&self, ids: &[MessageId]) -> Result<(), MemoryError> {
846        if ids.is_empty() {
847            return Ok(());
848        }
849        let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
850        let query = format!(
851            "UPDATE messages SET access_count = access_count + 1, last_accessed = datetime('now') \
852             WHERE id IN ({placeholders})"
853        );
854        let mut q = sqlx::query(&query);
855        for &id in ids {
856            q = q.bind(id);
857        }
858        q.execute(&self.pool).await?;
859        Ok(())
860    }
861
862    // ── Tier promotion helpers ─────────────────────────────────────────────────
863
864    /// Return episodic messages with `session_count >= min_sessions`, ordered by
865    /// session count descending then importance score descending.
866    ///
867    /// # Errors
868    ///
869    /// Returns an error if the query fails.
870    pub async fn find_promotion_candidates(
871        &self,
872        min_sessions: u32,
873        batch_size: usize,
874    ) -> Result<Vec<PromotionCandidate>, MemoryError> {
875        let limit = i64::try_from(batch_size).unwrap_or(i64::MAX);
876        let min = i64::from(min_sessions);
877        let rows: Vec<(MessageId, ConversationId, String, i64, f64)> = sqlx::query_as(
878            "SELECT id, conversation_id, content, session_count, importance_score \
879             FROM messages \
880             WHERE tier = 'episodic' AND session_count >= ? AND deleted_at IS NULL \
881             ORDER BY session_count DESC, importance_score DESC \
882             LIMIT ?",
883        )
884        .bind(min)
885        .bind(limit)
886        .fetch_all(&self.pool)
887        .await?;
888
889        Ok(rows
890            .into_iter()
891            .map(
892                |(id, conversation_id, content, session_count, importance_score)| {
893                    PromotionCandidate {
894                        id,
895                        conversation_id,
896                        content,
897                        session_count: session_count.try_into().unwrap_or(0),
898                        importance_score,
899                    }
900                },
901            )
902            .collect())
903    }
904
905    /// Count messages per tier (episodic, semantic) that are not deleted.
906    ///
907    /// Returns `(episodic_count, semantic_count)`.
908    ///
909    /// # Errors
910    ///
911    /// Returns an error if the query fails.
912    pub async fn count_messages_by_tier(&self) -> Result<(i64, i64), MemoryError> {
913        let rows: Vec<(String, i64)> = sqlx::query_as(
914            "SELECT tier, COUNT(*) FROM messages \
915             WHERE deleted_at IS NULL AND tier IN ('episodic', 'semantic') \
916             GROUP BY tier",
917        )
918        .fetch_all(&self.pool)
919        .await?;
920
921        let mut episodic = 0i64;
922        let mut semantic = 0i64;
923        for (tier, count) in rows {
924            match tier.as_str() {
925                "episodic" => episodic = count,
926                "semantic" => semantic = count,
927                _ => {}
928            }
929        }
930        Ok((episodic, semantic))
931    }
932
933    /// Count semantic facts (tier='semantic', not deleted).
934    ///
935    /// # Errors
936    ///
937    /// Returns an error if the query fails.
938    pub async fn count_semantic_facts(&self) -> Result<i64, MemoryError> {
939        let row: (i64,) = sqlx::query_as(
940            "SELECT COUNT(*) FROM messages WHERE tier = 'semantic' AND deleted_at IS NULL",
941        )
942        .fetch_one(&self.pool)
943        .await?;
944        Ok(row.0)
945    }
946
947    /// Promote a set of episodic messages to semantic tier in a single transaction.
948    ///
949    /// Within one transaction:
950    /// 1. Inserts a new message with `tier='semantic'` and `promotion_timestamp=unixepoch()`.
951    /// 2. Soft-deletes the original episodic messages and marks them `qdrant_cleaned=0`
952    ///    so the eviction sweep picks up their Qdrant vectors.
953    ///
954    /// Returns the `MessageId` of the new semantic message.
955    ///
956    /// # Errors
957    ///
958    /// Returns an error if the transaction fails.
959    pub async fn promote_to_semantic(
960        &self,
961        conversation_id: ConversationId,
962        merged_content: &str,
963        original_ids: &[MessageId],
964    ) -> Result<MessageId, MemoryError> {
965        if original_ids.is_empty() {
966            return Err(MemoryError::Other(
967                "promote_to_semantic: original_ids must not be empty".into(),
968            ));
969        }
970
971        let mut tx = self.pool.begin().await?;
972
973        // Insert the new semantic fact.
974        let row: (MessageId,) = sqlx::query_as(
975            "INSERT INTO messages \
976             (conversation_id, role, content, parts, agent_visible, user_visible, \
977              tier, promotion_timestamp) \
978             VALUES (?, 'assistant', ?, '[]', 1, 0, 'semantic', unixepoch()) \
979             RETURNING id",
980        )
981        .bind(conversation_id)
982        .bind(merged_content)
983        .fetch_one(&mut *tx)
984        .await?;
985
986        let new_id = row.0;
987
988        // Soft-delete originals and reset qdrant_cleaned so eviction sweep removes vectors.
989        for &id in original_ids {
990            sqlx::query(
991                "UPDATE messages \
992                 SET deleted_at = datetime('now'), qdrant_cleaned = 0 \
993                 WHERE id = ? AND deleted_at IS NULL",
994            )
995            .bind(id)
996            .execute(&mut *tx)
997            .await?;
998        }
999
1000        tx.commit().await?;
1001        Ok(new_id)
1002    }
1003
1004    /// Manually promote a set of messages to semantic tier without merging.
1005    ///
1006    /// Sets `tier='semantic'` and `promotion_timestamp=unixepoch()` for the given IDs.
1007    /// Does NOT soft-delete the originals — use this for direct user-requested promotion.
1008    ///
1009    /// # Errors
1010    ///
1011    /// Returns an error if the update fails.
1012    pub async fn manual_promote(&self, ids: &[MessageId]) -> Result<usize, MemoryError> {
1013        if ids.is_empty() {
1014            return Ok(0);
1015        }
1016        let mut count = 0usize;
1017        for &id in ids {
1018            let result = sqlx::query(
1019                "UPDATE messages \
1020                 SET tier = 'semantic', promotion_timestamp = unixepoch() \
1021                 WHERE id = ? AND deleted_at IS NULL AND tier = 'episodic'",
1022            )
1023            .bind(id)
1024            .execute(&self.pool)
1025            .await?;
1026            count += usize::try_from(result.rows_affected()).unwrap_or(0);
1027        }
1028        Ok(count)
1029    }
1030
1031    /// Increment `session_count` for all episodic messages in a conversation.
1032    ///
1033    /// Called when a session restores an existing conversation to mark that messages
1034    /// were accessed in a new session. Only episodic (non-deleted) messages are updated.
1035    ///
1036    /// # Errors
1037    ///
1038    /// Returns an error if the database update fails.
1039    pub async fn increment_session_counts_for_conversation(
1040        &self,
1041        conversation_id: ConversationId,
1042    ) -> Result<(), MemoryError> {
1043        sqlx::query(
1044            "UPDATE messages SET session_count = session_count + 1 \
1045             WHERE conversation_id = ? AND tier = 'episodic' AND deleted_at IS NULL",
1046        )
1047        .bind(conversation_id)
1048        .execute(&self.pool)
1049        .await?;
1050        Ok(())
1051    }
1052
1053    /// Fetch the tier string for each of the given message IDs.
1054    ///
1055    /// Messages not found or already deleted are omitted from the result.
1056    ///
1057    /// # Errors
1058    ///
1059    /// Returns an error if the query fails.
1060    pub async fn fetch_tiers(
1061        &self,
1062        ids: &[MessageId],
1063    ) -> Result<std::collections::HashMap<MessageId, String>, MemoryError> {
1064        if ids.is_empty() {
1065            return Ok(std::collections::HashMap::new());
1066        }
1067        let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1068        let query = format!(
1069            "SELECT id, tier FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
1070        );
1071        let mut q = sqlx::query_as::<_, (MessageId, String)>(&query);
1072        for &id in ids {
1073            q = q.bind(id);
1074        }
1075        let rows = q.fetch_all(&self.pool).await?;
1076        Ok(rows.into_iter().collect())
1077    }
1078}
1079
1080/// A candidate message for tier promotion, returned by [`SqliteStore::find_promotion_candidates`].
1081#[derive(Debug, Clone)]
1082pub struct PromotionCandidate {
1083    pub id: MessageId,
1084    pub conversation_id: ConversationId,
1085    pub content: String,
1086    pub session_count: u32,
1087    pub importance_score: f64,
1088}
1089
1090#[cfg(test)]
1091mod tests;