Skip to main content

zeph_memory/sqlite/messages/
mod.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use zeph_llm::provider::{Message, MessageMetadata, MessagePart, Role};
5
6use super::SqliteStore;
7use crate::error::MemoryError;
8use crate::types::{ConversationId, MessageId};
9
10/// Sanitize an arbitrary string into a valid FTS5 query.
11///
12/// Splits on non-alphanumeric characters, filters empty tokens, and joins
13/// with spaces. This strips FTS5 special characters (`"`, `*`, `(`, `)`,
14/// `^`, `-`, `+`, `:`) to prevent syntax errors in `MATCH` clauses.
15///
16/// Note: FTS5 boolean operators (AND, OR, NOT, NEAR) are preserved in their
17/// original case. Callers that need to prevent operator interpretation must
18/// filter these tokens separately (see `find_entities_fuzzy` in `graph/store.rs`).
19pub(crate) fn sanitize_fts5_query(query: &str) -> String {
20    query
21        .split(|c: char| !c.is_alphanumeric())
22        .filter(|t| !t.is_empty())
23        .collect::<Vec<_>>()
24        .join(" ")
25}
26
27fn parse_role(s: &str) -> Role {
28    match s {
29        "assistant" => Role::Assistant,
30        "system" => Role::System,
31        _ => Role::User,
32    }
33}
34
35#[must_use]
36pub fn role_str(role: Role) -> &'static str {
37    match role {
38        Role::System => "system",
39        Role::User => "user",
40        Role::Assistant => "assistant",
41    }
42}
43
44/// Deserialize message parts from a stored JSON string.
45///
46/// Returns an empty `Vec` and logs a warning if deserialization fails, including the role and
47/// a truncated excerpt of the malformed JSON for diagnostics.
48fn parse_parts_json(role_str: &str, parts_json: &str) -> Vec<MessagePart> {
49    if parts_json == "[]" {
50        return vec![];
51    }
52    match serde_json::from_str(parts_json) {
53        Ok(p) => p,
54        Err(e) => {
55            let truncated = parts_json.chars().take(120).collect::<String>();
56            tracing::warn!(
57                role = %role_str,
58                parts_json = %truncated,
59                error = %e,
60                "failed to deserialize message parts, falling back to empty"
61            );
62            vec![]
63        }
64    }
65}
66
67impl SqliteStore {
68    /// Create a new conversation and return its ID.
69    ///
70    /// # Errors
71    ///
72    /// Returns an error if the insert fails.
73    pub async fn create_conversation(&self) -> Result<ConversationId, MemoryError> {
74        let row: (ConversationId,) =
75            sqlx::query_as("INSERT INTO conversations DEFAULT VALUES RETURNING id")
76                .fetch_one(&self.pool)
77                .await?;
78        Ok(row.0)
79    }
80
81    /// Save a message to the given conversation and return the message ID.
82    ///
83    /// # Errors
84    ///
85    /// Returns an error if the insert fails.
86    pub async fn save_message(
87        &self,
88        conversation_id: ConversationId,
89        role: &str,
90        content: &str,
91    ) -> Result<MessageId, MemoryError> {
92        self.save_message_with_parts(conversation_id, role, content, "[]")
93            .await
94    }
95
96    /// Save a message with structured parts JSON.
97    ///
98    /// # Errors
99    ///
100    /// Returns an error if the insert fails.
101    pub async fn save_message_with_parts(
102        &self,
103        conversation_id: ConversationId,
104        role: &str,
105        content: &str,
106        parts_json: &str,
107    ) -> Result<MessageId, MemoryError> {
108        self.save_message_with_metadata(conversation_id, role, content, parts_json, true, true)
109            .await
110    }
111
112    /// Save a message with visibility metadata.
113    ///
114    /// # Errors
115    ///
116    /// Returns an error if the insert fails.
117    pub async fn save_message_with_metadata(
118        &self,
119        conversation_id: ConversationId,
120        role: &str,
121        content: &str,
122        parts_json: &str,
123        agent_visible: bool,
124        user_visible: bool,
125    ) -> Result<MessageId, MemoryError> {
126        let importance_score = crate::semantic::importance::compute_importance(content, role);
127        let row: (MessageId,) = sqlx::query_as(
128            "INSERT INTO messages (conversation_id, role, content, parts, agent_visible, user_visible, importance_score) \
129             VALUES (?, ?, ?, ?, ?, ?, ?) RETURNING id",
130        )
131        .bind(conversation_id)
132        .bind(role)
133        .bind(content)
134        .bind(parts_json)
135        .bind(i64::from(agent_visible))
136        .bind(i64::from(user_visible))
137        .bind(importance_score)
138        .fetch_one(&self.pool)
139        .await?;
140        Ok(row.0)
141    }
142
143    /// Load the most recent messages for a conversation, up to `limit`.
144    ///
145    /// # Errors
146    ///
147    /// Returns an error if the query fails.
148    pub async fn load_history(
149        &self,
150        conversation_id: ConversationId,
151        limit: u32,
152    ) -> Result<Vec<Message>, MemoryError> {
153        let rows: Vec<(String, String, String, i64, i64)> = sqlx::query_as(
154            "SELECT role, content, parts, agent_visible, user_visible FROM (\
155                SELECT role, content, parts, agent_visible, user_visible, id FROM messages \
156                WHERE conversation_id = ? AND deleted_at IS NULL \
157                ORDER BY id DESC \
158                LIMIT ?\
159             ) ORDER BY id ASC",
160        )
161        .bind(conversation_id)
162        .bind(limit)
163        .fetch_all(&self.pool)
164        .await?;
165
166        let messages = rows
167            .into_iter()
168            .map(
169                |(role_str, content, parts_json, agent_visible, user_visible)| {
170                    let parts = parse_parts_json(&role_str, &parts_json);
171                    Message {
172                        role: parse_role(&role_str),
173                        content,
174                        parts,
175                        metadata: MessageMetadata {
176                            agent_visible: agent_visible != 0,
177                            user_visible: user_visible != 0,
178                            compacted_at: None,
179                            deferred_summary: None,
180                            focus_pinned: false,
181                            focus_marker_id: None,
182                        },
183                    }
184                },
185            )
186            .collect();
187        Ok(messages)
188    }
189
190    /// Load messages filtered by visibility flags.
191    ///
192    /// Pass `Some(true)` to filter by a flag, `None` to skip filtering.
193    ///
194    /// # Errors
195    ///
196    /// Returns an error if the query fails.
197    pub async fn load_history_filtered(
198        &self,
199        conversation_id: ConversationId,
200        limit: u32,
201        agent_visible: Option<bool>,
202        user_visible: Option<bool>,
203    ) -> Result<Vec<Message>, MemoryError> {
204        let av = agent_visible.map(i64::from);
205        let uv = user_visible.map(i64::from);
206
207        let rows: Vec<(String, String, String, i64, i64)> = sqlx::query_as(
208            "WITH recent AS (\
209                SELECT role, content, parts, agent_visible, user_visible, id FROM messages \
210                WHERE conversation_id = ? \
211                  AND deleted_at IS NULL \
212                  AND (? IS NULL OR agent_visible = ?) \
213                  AND (? IS NULL OR user_visible = ?) \
214                ORDER BY id DESC \
215                LIMIT ?\
216             ) SELECT role, content, parts, agent_visible, user_visible FROM recent ORDER BY id ASC",
217        )
218        .bind(conversation_id)
219        .bind(av)
220        .bind(av)
221        .bind(uv)
222        .bind(uv)
223        .bind(limit)
224        .fetch_all(&self.pool)
225        .await?;
226
227        let messages = rows
228            .into_iter()
229            .map(
230                |(role_str, content, parts_json, agent_visible, user_visible)| {
231                    let parts = parse_parts_json(&role_str, &parts_json);
232                    Message {
233                        role: parse_role(&role_str),
234                        content,
235                        parts,
236                        metadata: MessageMetadata {
237                            agent_visible: agent_visible != 0,
238                            user_visible: user_visible != 0,
239                            compacted_at: None,
240                            deferred_summary: None,
241                            focus_pinned: false,
242                            focus_marker_id: None,
243                        },
244                    }
245                },
246            )
247            .collect();
248        Ok(messages)
249    }
250
251    /// Atomically mark a range of messages as user-only and insert a summary as agent-only.
252    ///
253    /// Within a single transaction:
254    /// 1. Updates `agent_visible=0, compacted_at=now` for messages in `compacted_range`.
255    /// 2. Inserts `summary_content` with `agent_visible=1, user_visible=0`.
256    ///
257    /// Returns the `MessageId` of the inserted summary.
258    ///
259    /// # Errors
260    ///
261    /// Returns an error if the transaction fails.
262    pub async fn replace_conversation(
263        &self,
264        conversation_id: ConversationId,
265        compacted_range: std::ops::RangeInclusive<MessageId>,
266        summary_role: &str,
267        summary_content: &str,
268    ) -> Result<MessageId, MemoryError> {
269        let now = {
270            let secs = std::time::SystemTime::now()
271                .duration_since(std::time::UNIX_EPOCH)
272                .unwrap_or_default()
273                .as_secs();
274            format!("{secs}")
275        };
276        let start_id = compacted_range.start().0;
277        let end_id = compacted_range.end().0;
278
279        let mut tx = self.pool.begin().await?;
280
281        sqlx::query(
282            "UPDATE messages SET agent_visible = 0, compacted_at = ? \
283             WHERE conversation_id = ? AND id >= ? AND id <= ?",
284        )
285        .bind(&now)
286        .bind(conversation_id)
287        .bind(start_id)
288        .bind(end_id)
289        .execute(&mut *tx)
290        .await?;
291
292        // importance_score uses schema DEFAULT 0.5 (neutral); compaction summaries are not scored at write time.
293        let row: (MessageId,) = sqlx::query_as(
294            "INSERT INTO messages \
295             (conversation_id, role, content, parts, agent_visible, user_visible) \
296             VALUES (?, ?, ?, '[]', 1, 0) RETURNING id",
297        )
298        .bind(conversation_id)
299        .bind(summary_role)
300        .bind(summary_content)
301        .fetch_one(&mut *tx)
302        .await?;
303
304        tx.commit().await?;
305
306        Ok(row.0)
307    }
308
309    /// Return the IDs of the N oldest messages in a conversation (ascending order).
310    ///
311    /// # Errors
312    ///
313    /// Returns an error if the query fails.
314    pub async fn oldest_message_ids(
315        &self,
316        conversation_id: ConversationId,
317        n: u32,
318    ) -> Result<Vec<MessageId>, MemoryError> {
319        let rows: Vec<(MessageId,)> = sqlx::query_as(
320            "SELECT id FROM messages WHERE conversation_id = ? AND deleted_at IS NULL ORDER BY id ASC LIMIT ?",
321        )
322        .bind(conversation_id)
323        .bind(n)
324        .fetch_all(&self.pool)
325        .await?;
326        Ok(rows.into_iter().map(|r| r.0).collect())
327    }
328
329    /// Return the ID of the most recent conversation, if any.
330    ///
331    /// # Errors
332    ///
333    /// Returns an error if the query fails.
334    pub async fn latest_conversation_id(&self) -> Result<Option<ConversationId>, MemoryError> {
335        let row: Option<(ConversationId,)> =
336            sqlx::query_as("SELECT id FROM conversations ORDER BY id DESC LIMIT 1")
337                .fetch_optional(&self.pool)
338                .await?;
339        Ok(row.map(|r| r.0))
340    }
341
342    /// Fetch a single message by its ID.
343    ///
344    /// # Errors
345    ///
346    /// Returns an error if the query fails.
347    pub async fn message_by_id(
348        &self,
349        message_id: MessageId,
350    ) -> Result<Option<Message>, MemoryError> {
351        let row: Option<(String, String, String, i64, i64)> = sqlx::query_as(
352            "SELECT role, content, parts, agent_visible, user_visible FROM messages WHERE id = ? AND deleted_at IS NULL",
353        )
354        .bind(message_id)
355        .fetch_optional(&self.pool)
356        .await?;
357
358        Ok(row.map(
359            |(role_str, content, parts_json, agent_visible, user_visible)| {
360                let parts = parse_parts_json(&role_str, &parts_json);
361                Message {
362                    role: parse_role(&role_str),
363                    content,
364                    parts,
365                    metadata: MessageMetadata {
366                        agent_visible: agent_visible != 0,
367                        user_visible: user_visible != 0,
368                        compacted_at: None,
369                        deferred_summary: None,
370                        focus_pinned: false,
371                        focus_marker_id: None,
372                    },
373                }
374            },
375        ))
376    }
377
378    /// Fetch messages by a list of IDs in a single query.
379    ///
380    /// # Errors
381    ///
382    /// Returns an error if the query fails.
383    pub async fn messages_by_ids(
384        &self,
385        ids: &[MessageId],
386    ) -> Result<Vec<(MessageId, Message)>, MemoryError> {
387        if ids.is_empty() {
388            return Ok(Vec::new());
389        }
390
391        let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
392
393        let query = format!(
394            "SELECT id, role, content, parts FROM messages \
395             WHERE id IN ({placeholders}) AND agent_visible = 1 AND deleted_at IS NULL"
396        );
397        let mut q = sqlx::query_as::<_, (MessageId, String, String, String)>(&query);
398        for &id in ids {
399            q = q.bind(id);
400        }
401
402        let rows = q.fetch_all(&self.pool).await?;
403
404        Ok(rows
405            .into_iter()
406            .map(|(id, role_str, content, parts_json)| {
407                let parts = parse_parts_json(&role_str, &parts_json);
408                (
409                    id,
410                    Message {
411                        role: parse_role(&role_str),
412                        content,
413                        parts,
414                        metadata: MessageMetadata::default(),
415                    },
416                )
417            })
418            .collect())
419    }
420
421    /// Return message IDs and content for messages without embeddings.
422    ///
423    /// # Errors
424    ///
425    /// Returns an error if the query fails.
426    pub async fn unembedded_message_ids(
427        &self,
428        limit: Option<usize>,
429    ) -> Result<Vec<(MessageId, ConversationId, String, String)>, MemoryError> {
430        let effective_limit = limit.map_or(i64::MAX, |l| i64::try_from(l).unwrap_or(i64::MAX));
431
432        let rows: Vec<(MessageId, ConversationId, String, String)> = sqlx::query_as(
433            "SELECT m.id, m.conversation_id, m.role, m.content \
434             FROM messages m \
435             LEFT JOIN embeddings_metadata em ON m.id = em.message_id \
436             WHERE em.id IS NULL AND m.deleted_at IS NULL \
437             ORDER BY m.id ASC \
438             LIMIT ?",
439        )
440        .bind(effective_limit)
441        .fetch_all(&self.pool)
442        .await?;
443
444        Ok(rows)
445    }
446
447    /// Count the number of messages in a conversation.
448    ///
449    /// # Errors
450    ///
451    /// Returns an error if the query fails.
452    pub async fn count_messages(
453        &self,
454        conversation_id: ConversationId,
455    ) -> Result<i64, MemoryError> {
456        let row: (i64,) = sqlx::query_as(
457            "SELECT COUNT(*) FROM messages WHERE conversation_id = ? AND deleted_at IS NULL",
458        )
459        .bind(conversation_id)
460        .fetch_one(&self.pool)
461        .await?;
462        Ok(row.0)
463    }
464
465    /// Count messages in a conversation with id greater than `after_id`.
466    ///
467    /// # Errors
468    ///
469    /// Returns an error if the query fails.
470    pub async fn count_messages_after(
471        &self,
472        conversation_id: ConversationId,
473        after_id: MessageId,
474    ) -> Result<i64, MemoryError> {
475        let row: (i64,) =
476            sqlx::query_as(
477                "SELECT COUNT(*) FROM messages WHERE conversation_id = ? AND id > ? AND deleted_at IS NULL",
478            )
479            .bind(conversation_id)
480            .bind(after_id)
481            .fetch_one(&self.pool)
482            .await?;
483        Ok(row.0)
484    }
485
486    /// Full-text keyword search over messages using FTS5.
487    ///
488    /// Returns message IDs with BM25 relevance scores (lower = more relevant,
489    /// negated to positive for consistency with vector scores).
490    ///
491    /// # Errors
492    ///
493    /// Returns an error if the query fails.
494    pub async fn keyword_search(
495        &self,
496        query: &str,
497        limit: usize,
498        conversation_id: Option<ConversationId>,
499    ) -> Result<Vec<(MessageId, f64)>, MemoryError> {
500        let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
501        let safe_query = sanitize_fts5_query(query);
502        if safe_query.is_empty() {
503            return Ok(Vec::new());
504        }
505
506        let rows: Vec<(MessageId, f64)> = if let Some(cid) = conversation_id {
507            sqlx::query_as(
508                "SELECT m.id, -rank AS score \
509                 FROM messages_fts f \
510                 JOIN messages m ON m.id = f.rowid \
511                 WHERE messages_fts MATCH ? AND m.conversation_id = ? AND m.agent_visible = 1 AND m.deleted_at IS NULL \
512                 ORDER BY rank \
513                 LIMIT ?",
514            )
515            .bind(&safe_query)
516            .bind(cid)
517            .bind(effective_limit)
518            .fetch_all(&self.pool)
519            .await?
520        } else {
521            sqlx::query_as(
522                "SELECT m.id, -rank AS score \
523                 FROM messages_fts f \
524                 JOIN messages m ON m.id = f.rowid \
525                 WHERE messages_fts MATCH ? AND m.agent_visible = 1 AND m.deleted_at IS NULL \
526                 ORDER BY rank \
527                 LIMIT ?",
528            )
529            .bind(&safe_query)
530            .bind(effective_limit)
531            .fetch_all(&self.pool)
532            .await?
533        };
534
535        Ok(rows)
536    }
537
538    /// Full-text keyword search over messages using FTS5, filtered by a `created_at` time range.
539    ///
540    /// Used by the `Episodic` recall path to combine keyword matching with temporal filtering.
541    /// Temporal keywords are stripped from `query` by the caller before this method is invoked
542    /// (see `strip_temporal_keywords`) to prevent BM25 score distortion.
543    ///
544    /// `after` and `before` are `SQLite` datetime strings in `YYYY-MM-DD HH:MM:SS` format (UTC).
545    /// `None` means "no bound" on that side.
546    ///
547    /// # Errors
548    ///
549    /// Returns an error if the query fails.
550    pub async fn keyword_search_with_time_range(
551        &self,
552        query: &str,
553        limit: usize,
554        conversation_id: Option<ConversationId>,
555        after: Option<&str>,
556        before: Option<&str>,
557    ) -> Result<Vec<(MessageId, f64)>, MemoryError> {
558        let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
559        let safe_query = sanitize_fts5_query(query);
560        if safe_query.is_empty() {
561            return Ok(Vec::new());
562        }
563
564        // Build time-range clauses dynamically. Both bounds are optional.
565        let after_clause = if after.is_some() {
566            " AND m.created_at > ?"
567        } else {
568            ""
569        };
570        let before_clause = if before.is_some() {
571            " AND m.created_at < ?"
572        } else {
573            ""
574        };
575        let conv_clause = if conversation_id.is_some() {
576            " AND m.conversation_id = ?"
577        } else {
578            ""
579        };
580
581        let sql = format!(
582            "SELECT m.id, -rank AS score \
583             FROM messages_fts f \
584             JOIN messages m ON m.id = f.rowid \
585             WHERE messages_fts MATCH ? AND m.agent_visible = 1 AND m.deleted_at IS NULL\
586             {after_clause}{before_clause}{conv_clause} \
587             ORDER BY rank \
588             LIMIT ?"
589        );
590
591        let mut q = sqlx::query_as::<_, (MessageId, f64)>(&sql).bind(&safe_query);
592        if let Some(a) = after {
593            q = q.bind(a);
594        }
595        if let Some(b) = before {
596            q = q.bind(b);
597        }
598        if let Some(cid) = conversation_id {
599            q = q.bind(cid);
600        }
601        q = q.bind(effective_limit);
602
603        Ok(q.fetch_all(&self.pool).await?)
604    }
605
606    /// Fetch creation timestamps (Unix epoch seconds) for the given message IDs.
607    ///
608    /// Messages without a `created_at` column fall back to 0.
609    ///
610    /// # Errors
611    ///
612    /// Returns an error if the query fails.
613    pub async fn message_timestamps(
614        &self,
615        ids: &[MessageId],
616    ) -> Result<std::collections::HashMap<MessageId, i64>, MemoryError> {
617        if ids.is_empty() {
618            return Ok(std::collections::HashMap::new());
619        }
620
621        let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
622        let query = format!(
623            "SELECT id, COALESCE(CAST(strftime('%s', created_at) AS INTEGER), 0) \
624             FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
625        );
626        let mut q = sqlx::query_as::<_, (MessageId, i64)>(&query);
627        for &id in ids {
628            q = q.bind(id);
629        }
630
631        let rows = q.fetch_all(&self.pool).await?;
632        Ok(rows.into_iter().collect())
633    }
634
635    /// Load a range of messages after a given message ID.
636    ///
637    /// # Errors
638    ///
639    /// Returns an error if the query fails.
640    pub async fn load_messages_range(
641        &self,
642        conversation_id: ConversationId,
643        after_message_id: MessageId,
644        limit: usize,
645    ) -> Result<Vec<(MessageId, String, String)>, MemoryError> {
646        let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
647
648        let rows: Vec<(MessageId, String, String)> = sqlx::query_as(
649            "SELECT id, role, content FROM messages \
650             WHERE conversation_id = ? AND id > ? AND deleted_at IS NULL \
651             ORDER BY id ASC LIMIT ?",
652        )
653        .bind(conversation_id)
654        .bind(after_message_id)
655        .bind(effective_limit)
656        .fetch_all(&self.pool)
657        .await?;
658
659        Ok(rows)
660    }
661
662    // ── Eviction helpers ──────────────────────────────────────────────────────
663
664    /// Return all non-deleted message IDs with their eviction metadata.
665    ///
666    /// # Errors
667    ///
668    /// Returns an error if the query fails.
669    pub async fn get_eviction_candidates(
670        &self,
671    ) -> Result<Vec<crate::eviction::EvictionEntry>, crate::error::MemoryError> {
672        let rows: Vec<(MessageId, String, Option<String>, i64)> = sqlx::query_as(
673            "SELECT id, created_at, last_accessed, access_count \
674             FROM messages WHERE deleted_at IS NULL",
675        )
676        .fetch_all(&self.pool)
677        .await?;
678
679        Ok(rows
680            .into_iter()
681            .map(
682                |(id, created_at, last_accessed, access_count)| crate::eviction::EvictionEntry {
683                    id,
684                    created_at,
685                    last_accessed,
686                    access_count: access_count.try_into().unwrap_or(0),
687                },
688            )
689            .collect())
690    }
691
692    /// Soft-delete a set of messages by marking `deleted_at`.
693    ///
694    /// Soft-deleted messages are excluded from all history queries.
695    ///
696    /// # Errors
697    ///
698    /// Returns an error if the update fails.
699    pub async fn soft_delete_messages(
700        &self,
701        ids: &[MessageId],
702    ) -> Result<(), crate::error::MemoryError> {
703        if ids.is_empty() {
704            return Ok(());
705        }
706        // SQLite does not support array binding natively. Batch via individual updates.
707        for &id in ids {
708            sqlx::query(
709                "UPDATE messages SET deleted_at = datetime('now') WHERE id = ? AND deleted_at IS NULL",
710            )
711            .bind(id)
712            .execute(&self.pool)
713            .await?;
714        }
715        Ok(())
716    }
717
718    /// Return IDs of soft-deleted messages that have not yet been cleaned from Qdrant.
719    ///
720    /// # Errors
721    ///
722    /// Returns an error if the query fails.
723    pub async fn get_soft_deleted_message_ids(
724        &self,
725    ) -> Result<Vec<MessageId>, crate::error::MemoryError> {
726        let rows: Vec<(MessageId,)> = sqlx::query_as(
727            "SELECT id FROM messages WHERE deleted_at IS NOT NULL AND qdrant_cleaned = 0",
728        )
729        .fetch_all(&self.pool)
730        .await?;
731        Ok(rows.into_iter().map(|(id,)| id).collect())
732    }
733
734    /// Mark a set of soft-deleted messages as Qdrant-cleaned.
735    ///
736    /// # Errors
737    ///
738    /// Returns an error if the update fails.
739    pub async fn mark_qdrant_cleaned(
740        &self,
741        ids: &[MessageId],
742    ) -> Result<(), crate::error::MemoryError> {
743        for &id in ids {
744            sqlx::query("UPDATE messages SET qdrant_cleaned = 1 WHERE id = ?")
745                .bind(id)
746                .execute(&self.pool)
747                .await?;
748        }
749        Ok(())
750    }
751
752    /// Fetch `importance_score` values for the given message IDs.
753    ///
754    /// Messages missing from the table fall back to 0.5 (neutral) and are omitted from the map.
755    ///
756    /// # Errors
757    ///
758    /// Returns an error if the query fails.
759    pub async fn fetch_importance_scores(
760        &self,
761        ids: &[MessageId],
762    ) -> Result<std::collections::HashMap<MessageId, f64>, MemoryError> {
763        if ids.is_empty() {
764            return Ok(std::collections::HashMap::new());
765        }
766        let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
767        let query = format!(
768            "SELECT id, importance_score FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
769        );
770        let mut q = sqlx::query_as::<_, (MessageId, f64)>(&query);
771        for &id in ids {
772            q = q.bind(id);
773        }
774        let rows = q.fetch_all(&self.pool).await?;
775        Ok(rows.into_iter().collect())
776    }
777
778    /// Increment `access_count` and set `last_accessed = datetime('now')` for the given IDs.
779    ///
780    /// Skips the update when `ids` is empty.
781    ///
782    /// # Errors
783    ///
784    /// Returns an error if the update fails.
785    pub async fn increment_access_counts(&self, ids: &[MessageId]) -> Result<(), MemoryError> {
786        if ids.is_empty() {
787            return Ok(());
788        }
789        let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
790        let query = format!(
791            "UPDATE messages SET access_count = access_count + 1, last_accessed = datetime('now') \
792             WHERE id IN ({placeholders})"
793        );
794        let mut q = sqlx::query(&query);
795        for &id in ids {
796            q = q.bind(id);
797        }
798        q.execute(&self.pool).await?;
799        Ok(())
800    }
801}
802
803#[cfg(test)]
804mod tests;