Skip to main content

zeph_memory/sqlite/
messages.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use zeph_llm::provider::{Message, MessageMetadata, MessagePart, Role};
5
6use super::SqliteStore;
7use crate::error::MemoryError;
8use crate::types::{ConversationId, MessageId};
9
10/// Sanitize an arbitrary string into a valid FTS5 query.
11///
12/// Splits on non-alphanumeric characters, filters empty tokens, and joins
13/// with spaces. This strips FTS5 special characters (`"`, `*`, `(`, `)`,
14/// `^`, `-`, `+`, `:`) to prevent syntax errors in `MATCH` clauses.
15///
16/// Note: FTS5 boolean operators (AND, OR, NOT, NEAR) are preserved in their
17/// original case. Callers that need to prevent operator interpretation must
18/// filter these tokens separately (see `find_entities_fuzzy` in `graph/store.rs`).
19pub(crate) fn sanitize_fts5_query(query: &str) -> String {
20    query
21        .split(|c: char| !c.is_alphanumeric())
22        .filter(|t| !t.is_empty())
23        .collect::<Vec<_>>()
24        .join(" ")
25}
26
27fn parse_role(s: &str) -> Role {
28    match s {
29        "assistant" => Role::Assistant,
30        "system" => Role::System,
31        _ => Role::User,
32    }
33}
34
35#[must_use]
36pub fn role_str(role: Role) -> &'static str {
37    match role {
38        Role::System => "system",
39        Role::User => "user",
40        Role::Assistant => "assistant",
41    }
42}
43
44impl SqliteStore {
45    /// Create a new conversation and return its ID.
46    ///
47    /// # Errors
48    ///
49    /// Returns an error if the insert fails.
50    pub async fn create_conversation(&self) -> Result<ConversationId, MemoryError> {
51        let row: (ConversationId,) =
52            sqlx::query_as("INSERT INTO conversations DEFAULT VALUES RETURNING id")
53                .fetch_one(&self.pool)
54                .await?;
55        Ok(row.0)
56    }
57
58    /// Save a message to the given conversation and return the message ID.
59    ///
60    /// # Errors
61    ///
62    /// Returns an error if the insert fails.
63    pub async fn save_message(
64        &self,
65        conversation_id: ConversationId,
66        role: &str,
67        content: &str,
68    ) -> Result<MessageId, MemoryError> {
69        self.save_message_with_parts(conversation_id, role, content, "[]")
70            .await
71    }
72
73    /// Save a message with structured parts JSON.
74    ///
75    /// # Errors
76    ///
77    /// Returns an error if the insert fails.
78    pub async fn save_message_with_parts(
79        &self,
80        conversation_id: ConversationId,
81        role: &str,
82        content: &str,
83        parts_json: &str,
84    ) -> Result<MessageId, MemoryError> {
85        self.save_message_with_metadata(conversation_id, role, content, parts_json, true, true)
86            .await
87    }
88
89    /// Save a message with visibility metadata.
90    ///
91    /// # Errors
92    ///
93    /// Returns an error if the insert fails.
94    pub async fn save_message_with_metadata(
95        &self,
96        conversation_id: ConversationId,
97        role: &str,
98        content: &str,
99        parts_json: &str,
100        agent_visible: bool,
101        user_visible: bool,
102    ) -> Result<MessageId, MemoryError> {
103        let row: (MessageId,) = sqlx::query_as(
104            "INSERT INTO messages (conversation_id, role, content, parts, agent_visible, user_visible) \
105             VALUES (?, ?, ?, ?, ?, ?) RETURNING id",
106        )
107        .bind(conversation_id)
108        .bind(role)
109        .bind(content)
110        .bind(parts_json)
111        .bind(i64::from(agent_visible))
112        .bind(i64::from(user_visible))
113        .fetch_one(&self.pool)
114        .await?;
115        Ok(row.0)
116    }
117
118    /// Load the most recent messages for a conversation, up to `limit`.
119    ///
120    /// # Errors
121    ///
122    /// Returns an error if the query fails.
123    pub async fn load_history(
124        &self,
125        conversation_id: ConversationId,
126        limit: u32,
127    ) -> Result<Vec<Message>, MemoryError> {
128        let rows: Vec<(String, String, String, i64, i64)> = sqlx::query_as(
129            "SELECT role, content, parts, agent_visible, user_visible FROM (\
130                SELECT role, content, parts, agent_visible, user_visible, id FROM messages \
131                WHERE conversation_id = ? AND deleted_at IS NULL \
132                ORDER BY id DESC \
133                LIMIT ?\
134             ) ORDER BY id ASC",
135        )
136        .bind(conversation_id)
137        .bind(limit)
138        .fetch_all(&self.pool)
139        .await?;
140
141        let messages = rows
142            .into_iter()
143            .map(
144                |(role_str, content, parts_json, agent_visible, user_visible)| {
145                    let parts: Vec<MessagePart> = if parts_json == "[]" {
146                        vec![]
147                    } else {
148                        serde_json::from_str(&parts_json).unwrap_or_default()
149                    };
150                    Message {
151                        role: parse_role(&role_str),
152                        content,
153                        parts,
154                        metadata: MessageMetadata {
155                            agent_visible: agent_visible != 0,
156                            user_visible: user_visible != 0,
157                            compacted_at: None,
158                        },
159                    }
160                },
161            )
162            .collect();
163        Ok(messages)
164    }
165
166    /// Load messages filtered by visibility flags.
167    ///
168    /// Pass `Some(true)` to filter by a flag, `None` to skip filtering.
169    ///
170    /// # Errors
171    ///
172    /// Returns an error if the query fails.
173    pub async fn load_history_filtered(
174        &self,
175        conversation_id: ConversationId,
176        limit: u32,
177        agent_visible: Option<bool>,
178        user_visible: Option<bool>,
179    ) -> Result<Vec<Message>, MemoryError> {
180        let av = agent_visible.map(i64::from);
181        let uv = user_visible.map(i64::from);
182
183        let rows: Vec<(String, String, String, i64, i64)> = sqlx::query_as(
184            "WITH recent AS (\
185                SELECT role, content, parts, agent_visible, user_visible, id FROM messages \
186                WHERE conversation_id = ? \
187                  AND deleted_at IS NULL \
188                  AND (? IS NULL OR agent_visible = ?) \
189                  AND (? IS NULL OR user_visible = ?) \
190                ORDER BY id DESC \
191                LIMIT ?\
192             ) SELECT role, content, parts, agent_visible, user_visible FROM recent ORDER BY id ASC",
193        )
194        .bind(conversation_id)
195        .bind(av)
196        .bind(av)
197        .bind(uv)
198        .bind(uv)
199        .bind(limit)
200        .fetch_all(&self.pool)
201        .await?;
202
203        let messages = rows
204            .into_iter()
205            .map(
206                |(role_str, content, parts_json, agent_visible, user_visible)| {
207                    let parts: Vec<MessagePart> = if parts_json == "[]" {
208                        vec![]
209                    } else {
210                        serde_json::from_str(&parts_json).unwrap_or_default()
211                    };
212                    Message {
213                        role: parse_role(&role_str),
214                        content,
215                        parts,
216                        metadata: MessageMetadata {
217                            agent_visible: agent_visible != 0,
218                            user_visible: user_visible != 0,
219                            compacted_at: None,
220                        },
221                    }
222                },
223            )
224            .collect();
225        Ok(messages)
226    }
227
228    /// Atomically mark a range of messages as user-only and insert a summary as agent-only.
229    ///
230    /// Within a single transaction:
231    /// 1. Updates `agent_visible=0, compacted_at=now` for messages in `compacted_range`.
232    /// 2. Inserts `summary_content` with `agent_visible=1, user_visible=0`.
233    ///
234    /// Returns the `MessageId` of the inserted summary.
235    ///
236    /// # Errors
237    ///
238    /// Returns an error if the transaction fails.
239    pub async fn replace_conversation(
240        &self,
241        conversation_id: ConversationId,
242        compacted_range: std::ops::RangeInclusive<MessageId>,
243        summary_role: &str,
244        summary_content: &str,
245    ) -> Result<MessageId, MemoryError> {
246        let now = {
247            let secs = std::time::SystemTime::now()
248                .duration_since(std::time::UNIX_EPOCH)
249                .unwrap_or_default()
250                .as_secs();
251            format!("{secs}")
252        };
253        let start_id = compacted_range.start().0;
254        let end_id = compacted_range.end().0;
255
256        let mut tx = self.pool.begin().await?;
257
258        sqlx::query(
259            "UPDATE messages SET agent_visible = 0, compacted_at = ? \
260             WHERE conversation_id = ? AND id >= ? AND id <= ?",
261        )
262        .bind(&now)
263        .bind(conversation_id)
264        .bind(start_id)
265        .bind(end_id)
266        .execute(&mut *tx)
267        .await?;
268
269        let row: (MessageId,) = sqlx::query_as(
270            "INSERT INTO messages \
271             (conversation_id, role, content, parts, agent_visible, user_visible) \
272             VALUES (?, ?, ?, '[]', 1, 0) RETURNING id",
273        )
274        .bind(conversation_id)
275        .bind(summary_role)
276        .bind(summary_content)
277        .fetch_one(&mut *tx)
278        .await?;
279
280        tx.commit().await?;
281
282        Ok(row.0)
283    }
284
285    /// Return the IDs of the N oldest messages in a conversation (ascending order).
286    ///
287    /// # Errors
288    ///
289    /// Returns an error if the query fails.
290    pub async fn oldest_message_ids(
291        &self,
292        conversation_id: ConversationId,
293        n: u32,
294    ) -> Result<Vec<MessageId>, MemoryError> {
295        let rows: Vec<(MessageId,)> = sqlx::query_as(
296            "SELECT id FROM messages WHERE conversation_id = ? AND deleted_at IS NULL ORDER BY id ASC LIMIT ?",
297        )
298        .bind(conversation_id)
299        .bind(n)
300        .fetch_all(&self.pool)
301        .await?;
302        Ok(rows.into_iter().map(|r| r.0).collect())
303    }
304
305    /// Return the ID of the most recent conversation, if any.
306    ///
307    /// # Errors
308    ///
309    /// Returns an error if the query fails.
310    pub async fn latest_conversation_id(&self) -> Result<Option<ConversationId>, MemoryError> {
311        let row: Option<(ConversationId,)> =
312            sqlx::query_as("SELECT id FROM conversations ORDER BY id DESC LIMIT 1")
313                .fetch_optional(&self.pool)
314                .await?;
315        Ok(row.map(|r| r.0))
316    }
317
318    /// Fetch a single message by its ID.
319    ///
320    /// # Errors
321    ///
322    /// Returns an error if the query fails.
323    pub async fn message_by_id(
324        &self,
325        message_id: MessageId,
326    ) -> Result<Option<Message>, MemoryError> {
327        let row: Option<(String, String, String, i64, i64)> = sqlx::query_as(
328            "SELECT role, content, parts, agent_visible, user_visible FROM messages WHERE id = ? AND deleted_at IS NULL",
329        )
330        .bind(message_id)
331        .fetch_optional(&self.pool)
332        .await?;
333
334        Ok(row.map(
335            |(role_str, content, parts_json, agent_visible, user_visible)| {
336                let parts: Vec<MessagePart> = if parts_json == "[]" {
337                    vec![]
338                } else {
339                    serde_json::from_str(&parts_json).unwrap_or_default()
340                };
341                Message {
342                    role: parse_role(&role_str),
343                    content,
344                    parts,
345                    metadata: MessageMetadata {
346                        agent_visible: agent_visible != 0,
347                        user_visible: user_visible != 0,
348                        compacted_at: None,
349                    },
350                }
351            },
352        ))
353    }
354
355    /// Fetch messages by a list of IDs in a single query.
356    ///
357    /// # Errors
358    ///
359    /// Returns an error if the query fails.
360    pub async fn messages_by_ids(
361        &self,
362        ids: &[MessageId],
363    ) -> Result<Vec<(MessageId, Message)>, MemoryError> {
364        if ids.is_empty() {
365            return Ok(Vec::new());
366        }
367
368        let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
369
370        let query = format!(
371            "SELECT id, role, content, parts FROM messages \
372             WHERE id IN ({placeholders}) AND agent_visible = 1 AND deleted_at IS NULL"
373        );
374        let mut q = sqlx::query_as::<_, (MessageId, String, String, String)>(&query);
375        for &id in ids {
376            q = q.bind(id);
377        }
378
379        let rows = q.fetch_all(&self.pool).await?;
380
381        Ok(rows
382            .into_iter()
383            .map(|(id, role_str, content, parts_json)| {
384                let parts: Vec<MessagePart> = if parts_json == "[]" {
385                    vec![]
386                } else {
387                    serde_json::from_str(&parts_json).unwrap_or_default()
388                };
389                (
390                    id,
391                    Message {
392                        role: parse_role(&role_str),
393                        content,
394                        parts,
395                        metadata: MessageMetadata::default(),
396                    },
397                )
398            })
399            .collect())
400    }
401
402    /// Return message IDs and content for messages without embeddings.
403    ///
404    /// # Errors
405    ///
406    /// Returns an error if the query fails.
407    pub async fn unembedded_message_ids(
408        &self,
409        limit: Option<usize>,
410    ) -> Result<Vec<(MessageId, ConversationId, String, String)>, MemoryError> {
411        let effective_limit = limit.map_or(i64::MAX, |l| i64::try_from(l).unwrap_or(i64::MAX));
412
413        let rows: Vec<(MessageId, ConversationId, String, String)> = sqlx::query_as(
414            "SELECT m.id, m.conversation_id, m.role, m.content \
415             FROM messages m \
416             LEFT JOIN embeddings_metadata em ON m.id = em.message_id \
417             WHERE em.id IS NULL AND m.deleted_at IS NULL \
418             ORDER BY m.id ASC \
419             LIMIT ?",
420        )
421        .bind(effective_limit)
422        .fetch_all(&self.pool)
423        .await?;
424
425        Ok(rows)
426    }
427
428    /// Count the number of messages in a conversation.
429    ///
430    /// # Errors
431    ///
432    /// Returns an error if the query fails.
433    pub async fn count_messages(
434        &self,
435        conversation_id: ConversationId,
436    ) -> Result<i64, MemoryError> {
437        let row: (i64,) = sqlx::query_as(
438            "SELECT COUNT(*) FROM messages WHERE conversation_id = ? AND deleted_at IS NULL",
439        )
440        .bind(conversation_id)
441        .fetch_one(&self.pool)
442        .await?;
443        Ok(row.0)
444    }
445
446    /// Count messages in a conversation with id greater than `after_id`.
447    ///
448    /// # Errors
449    ///
450    /// Returns an error if the query fails.
451    pub async fn count_messages_after(
452        &self,
453        conversation_id: ConversationId,
454        after_id: MessageId,
455    ) -> Result<i64, MemoryError> {
456        let row: (i64,) =
457            sqlx::query_as(
458                "SELECT COUNT(*) FROM messages WHERE conversation_id = ? AND id > ? AND deleted_at IS NULL",
459            )
460            .bind(conversation_id)
461            .bind(after_id)
462            .fetch_one(&self.pool)
463            .await?;
464        Ok(row.0)
465    }
466
467    /// Full-text keyword search over messages using FTS5.
468    ///
469    /// Returns message IDs with BM25 relevance scores (lower = more relevant,
470    /// negated to positive for consistency with vector scores).
471    ///
472    /// # Errors
473    ///
474    /// Returns an error if the query fails.
475    pub async fn keyword_search(
476        &self,
477        query: &str,
478        limit: usize,
479        conversation_id: Option<ConversationId>,
480    ) -> Result<Vec<(MessageId, f64)>, MemoryError> {
481        let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
482        let safe_query = sanitize_fts5_query(query);
483        if safe_query.is_empty() {
484            return Ok(Vec::new());
485        }
486
487        let rows: Vec<(MessageId, f64)> = if let Some(cid) = conversation_id {
488            sqlx::query_as(
489                "SELECT m.id, -rank AS score \
490                 FROM messages_fts f \
491                 JOIN messages m ON m.id = f.rowid \
492                 WHERE messages_fts MATCH ? AND m.conversation_id = ? AND m.agent_visible = 1 AND m.deleted_at IS NULL \
493                 ORDER BY rank \
494                 LIMIT ?",
495            )
496            .bind(&safe_query)
497            .bind(cid)
498            .bind(effective_limit)
499            .fetch_all(&self.pool)
500            .await?
501        } else {
502            sqlx::query_as(
503                "SELECT m.id, -rank AS score \
504                 FROM messages_fts f \
505                 JOIN messages m ON m.id = f.rowid \
506                 WHERE messages_fts MATCH ? AND m.agent_visible = 1 AND m.deleted_at IS NULL \
507                 ORDER BY rank \
508                 LIMIT ?",
509            )
510            .bind(&safe_query)
511            .bind(effective_limit)
512            .fetch_all(&self.pool)
513            .await?
514        };
515
516        Ok(rows)
517    }
518
519    /// Fetch creation timestamps (Unix epoch seconds) for the given message IDs.
520    ///
521    /// Messages without a `created_at` column fall back to 0.
522    ///
523    /// # Errors
524    ///
525    /// Returns an error if the query fails.
526    pub async fn message_timestamps(
527        &self,
528        ids: &[MessageId],
529    ) -> Result<std::collections::HashMap<MessageId, i64>, MemoryError> {
530        if ids.is_empty() {
531            return Ok(std::collections::HashMap::new());
532        }
533
534        let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
535        let query = format!(
536            "SELECT id, COALESCE(CAST(strftime('%s', created_at) AS INTEGER), 0) \
537             FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
538        );
539        let mut q = sqlx::query_as::<_, (MessageId, i64)>(&query);
540        for &id in ids {
541            q = q.bind(id);
542        }
543
544        let rows = q.fetch_all(&self.pool).await?;
545        Ok(rows.into_iter().collect())
546    }
547
548    /// Load a range of messages after a given message ID.
549    ///
550    /// # Errors
551    ///
552    /// Returns an error if the query fails.
553    pub async fn load_messages_range(
554        &self,
555        conversation_id: ConversationId,
556        after_message_id: MessageId,
557        limit: usize,
558    ) -> Result<Vec<(MessageId, String, String)>, MemoryError> {
559        let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
560
561        let rows: Vec<(MessageId, String, String)> = sqlx::query_as(
562            "SELECT id, role, content FROM messages \
563             WHERE conversation_id = ? AND id > ? AND deleted_at IS NULL \
564             ORDER BY id ASC LIMIT ?",
565        )
566        .bind(conversation_id)
567        .bind(after_message_id)
568        .bind(effective_limit)
569        .fetch_all(&self.pool)
570        .await?;
571
572        Ok(rows)
573    }
574
575    // ── Eviction helpers ──────────────────────────────────────────────────────
576
577    /// Return all non-deleted message IDs with their eviction metadata.
578    ///
579    /// # Errors
580    ///
581    /// Returns an error if the query fails.
582    pub async fn get_eviction_candidates(
583        &self,
584    ) -> Result<Vec<crate::eviction::EvictionEntry>, crate::error::MemoryError> {
585        let rows: Vec<(MessageId, String, Option<String>, i64)> = sqlx::query_as(
586            "SELECT id, created_at, last_accessed, access_count \
587             FROM messages WHERE deleted_at IS NULL",
588        )
589        .fetch_all(&self.pool)
590        .await?;
591
592        Ok(rows
593            .into_iter()
594            .map(
595                |(id, created_at, last_accessed, access_count)| crate::eviction::EvictionEntry {
596                    id,
597                    created_at,
598                    last_accessed,
599                    access_count: access_count.try_into().unwrap_or(0),
600                },
601            )
602            .collect())
603    }
604
605    /// Soft-delete a set of messages by marking `deleted_at`.
606    ///
607    /// Soft-deleted messages are excluded from all history queries.
608    ///
609    /// # Errors
610    ///
611    /// Returns an error if the update fails.
612    pub async fn soft_delete_messages(
613        &self,
614        ids: &[MessageId],
615    ) -> Result<(), crate::error::MemoryError> {
616        if ids.is_empty() {
617            return Ok(());
618        }
619        // SQLite does not support array binding natively. Batch via individual updates.
620        for &id in ids {
621            sqlx::query(
622                "UPDATE messages SET deleted_at = datetime('now') WHERE id = ? AND deleted_at IS NULL",
623            )
624            .bind(id)
625            .execute(&self.pool)
626            .await?;
627        }
628        Ok(())
629    }
630
631    /// Return IDs of soft-deleted messages that have not yet been cleaned from Qdrant.
632    ///
633    /// # Errors
634    ///
635    /// Returns an error if the query fails.
636    pub async fn get_soft_deleted_message_ids(
637        &self,
638    ) -> Result<Vec<MessageId>, crate::error::MemoryError> {
639        let rows: Vec<(MessageId,)> = sqlx::query_as(
640            "SELECT id FROM messages WHERE deleted_at IS NOT NULL AND qdrant_cleaned = 0",
641        )
642        .fetch_all(&self.pool)
643        .await?;
644        Ok(rows.into_iter().map(|(id,)| id).collect())
645    }
646
647    /// Mark a set of soft-deleted messages as Qdrant-cleaned.
648    ///
649    /// # Errors
650    ///
651    /// Returns an error if the update fails.
652    pub async fn mark_qdrant_cleaned(
653        &self,
654        ids: &[MessageId],
655    ) -> Result<(), crate::error::MemoryError> {
656        for &id in ids {
657            sqlx::query("UPDATE messages SET qdrant_cleaned = 1 WHERE id = ?")
658                .bind(id)
659                .execute(&self.pool)
660                .await?;
661        }
662        Ok(())
663    }
664}
665
666#[cfg(test)]
667mod tests {
668    use super::*;
669
670    async fn test_store() -> SqliteStore {
671        SqliteStore::new(":memory:").await.unwrap()
672    }
673
674    #[tokio::test]
675    async fn create_conversation_returns_id() {
676        let store = test_store().await;
677        let id1 = store.create_conversation().await.unwrap();
678        let id2 = store.create_conversation().await.unwrap();
679        assert_eq!(id1, ConversationId(1));
680        assert_eq!(id2, ConversationId(2));
681    }
682
683    #[tokio::test]
684    async fn save_and_load_messages() {
685        let store = test_store().await;
686        let cid = store.create_conversation().await.unwrap();
687
688        let msg_id1 = store.save_message(cid, "user", "hello").await.unwrap();
689        let msg_id2 = store
690            .save_message(cid, "assistant", "hi there")
691            .await
692            .unwrap();
693
694        assert_eq!(msg_id1, MessageId(1));
695        assert_eq!(msg_id2, MessageId(2));
696
697        let history = store.load_history(cid, 50).await.unwrap();
698        assert_eq!(history.len(), 2);
699        assert_eq!(history[0].role, Role::User);
700        assert_eq!(history[0].content, "hello");
701        assert_eq!(history[1].role, Role::Assistant);
702        assert_eq!(history[1].content, "hi there");
703    }
704
705    #[tokio::test]
706    async fn load_history_respects_limit() {
707        let store = test_store().await;
708        let cid = store.create_conversation().await.unwrap();
709
710        for i in 0..10 {
711            store
712                .save_message(cid, "user", &format!("msg {i}"))
713                .await
714                .unwrap();
715        }
716
717        let history = store.load_history(cid, 3).await.unwrap();
718        assert_eq!(history.len(), 3);
719        assert_eq!(history[0].content, "msg 7");
720        assert_eq!(history[1].content, "msg 8");
721        assert_eq!(history[2].content, "msg 9");
722    }
723
724    #[tokio::test]
725    async fn latest_conversation_id_empty() {
726        let store = test_store().await;
727        assert!(store.latest_conversation_id().await.unwrap().is_none());
728    }
729
730    #[tokio::test]
731    async fn latest_conversation_id_returns_newest() {
732        let store = test_store().await;
733        store.create_conversation().await.unwrap();
734        let id2 = store.create_conversation().await.unwrap();
735        assert_eq!(store.latest_conversation_id().await.unwrap(), Some(id2));
736    }
737
738    #[tokio::test]
739    async fn messages_isolated_per_conversation() {
740        let store = test_store().await;
741        let cid1 = store.create_conversation().await.unwrap();
742        let cid2 = store.create_conversation().await.unwrap();
743
744        store.save_message(cid1, "user", "conv1").await.unwrap();
745        store.save_message(cid2, "user", "conv2").await.unwrap();
746
747        let h1 = store.load_history(cid1, 50).await.unwrap();
748        let h2 = store.load_history(cid2, 50).await.unwrap();
749        assert_eq!(h1.len(), 1);
750        assert_eq!(h1[0].content, "conv1");
751        assert_eq!(h2.len(), 1);
752        assert_eq!(h2[0].content, "conv2");
753    }
754
755    #[tokio::test]
756    async fn pool_accessor_returns_valid_pool() {
757        let store = test_store().await;
758        let pool = store.pool();
759        let row: (i64,) = sqlx::query_as("SELECT 1").fetch_one(pool).await.unwrap();
760        assert_eq!(row.0, 1);
761    }
762
763    #[tokio::test]
764    async fn embeddings_metadata_table_exists() {
765        let store = test_store().await;
766        let result: (i64,) = sqlx::query_as(
767            "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='embeddings_metadata'",
768        )
769        .fetch_one(store.pool())
770        .await
771        .unwrap();
772        assert_eq!(result.0, 1);
773    }
774
775    #[tokio::test]
776    async fn cascade_delete_removes_embeddings_metadata() {
777        let store = test_store().await;
778        let pool = store.pool();
779
780        let cid = store.create_conversation().await.unwrap();
781        let msg_id = store.save_message(cid, "user", "test").await.unwrap();
782
783        let point_id = uuid::Uuid::new_v4().to_string();
784        sqlx::query(
785            "INSERT INTO embeddings_metadata (message_id, qdrant_point_id, dimensions) \
786             VALUES (?, ?, ?)",
787        )
788        .bind(msg_id)
789        .bind(&point_id)
790        .bind(768_i64)
791        .execute(pool)
792        .await
793        .unwrap();
794
795        let before: (i64,) =
796            sqlx::query_as("SELECT COUNT(*) FROM embeddings_metadata WHERE message_id = ?")
797                .bind(msg_id)
798                .fetch_one(pool)
799                .await
800                .unwrap();
801        assert_eq!(before.0, 1);
802
803        sqlx::query("DELETE FROM messages WHERE id = ?")
804            .bind(msg_id)
805            .execute(pool)
806            .await
807            .unwrap();
808
809        let after: (i64,) =
810            sqlx::query_as("SELECT COUNT(*) FROM embeddings_metadata WHERE message_id = ?")
811                .bind(msg_id)
812                .fetch_one(pool)
813                .await
814                .unwrap();
815        assert_eq!(after.0, 0);
816    }
817
818    #[tokio::test]
819    async fn messages_by_ids_batch_fetch() {
820        let store = test_store().await;
821        let cid = store.create_conversation().await.unwrap();
822        let id1 = store.save_message(cid, "user", "hello").await.unwrap();
823        let id2 = store.save_message(cid, "assistant", "hi").await.unwrap();
824        let _id3 = store.save_message(cid, "user", "bye").await.unwrap();
825
826        let results = store.messages_by_ids(&[id1, id2]).await.unwrap();
827        assert_eq!(results.len(), 2);
828        assert_eq!(results[0].0, id1);
829        assert_eq!(results[0].1.content, "hello");
830        assert_eq!(results[1].0, id2);
831        assert_eq!(results[1].1.content, "hi");
832    }
833
834    #[tokio::test]
835    async fn messages_by_ids_empty_input() {
836        let store = test_store().await;
837        let results = store.messages_by_ids(&[]).await.unwrap();
838        assert!(results.is_empty());
839    }
840
841    #[tokio::test]
842    async fn messages_by_ids_nonexistent() {
843        let store = test_store().await;
844        let results = store
845            .messages_by_ids(&[MessageId(999), MessageId(1000)])
846            .await
847            .unwrap();
848        assert!(results.is_empty());
849    }
850
851    #[tokio::test]
852    async fn message_by_id_fetches_existing() {
853        let store = test_store().await;
854        let cid = store.create_conversation().await.unwrap();
855        let msg_id = store.save_message(cid, "user", "hello").await.unwrap();
856
857        let msg = store.message_by_id(msg_id).await.unwrap();
858        assert!(msg.is_some());
859        let msg = msg.unwrap();
860        assert_eq!(msg.role, Role::User);
861        assert_eq!(msg.content, "hello");
862    }
863
864    #[tokio::test]
865    async fn message_by_id_returns_none_for_nonexistent() {
866        let store = test_store().await;
867        let msg = store.message_by_id(MessageId(999)).await.unwrap();
868        assert!(msg.is_none());
869    }
870
871    #[tokio::test]
872    async fn unembedded_message_ids_returns_all_when_none_embedded() {
873        let store = test_store().await;
874        let cid = store.create_conversation().await.unwrap();
875
876        store.save_message(cid, "user", "msg1").await.unwrap();
877        store.save_message(cid, "assistant", "msg2").await.unwrap();
878
879        let unembedded = store.unembedded_message_ids(None).await.unwrap();
880        assert_eq!(unembedded.len(), 2);
881        assert_eq!(unembedded[0].3, "msg1");
882        assert_eq!(unembedded[1].3, "msg2");
883    }
884
885    #[tokio::test]
886    async fn unembedded_message_ids_excludes_embedded() {
887        let store = test_store().await;
888        let pool = store.pool();
889        let cid = store.create_conversation().await.unwrap();
890
891        let msg_id1 = store.save_message(cid, "user", "msg1").await.unwrap();
892        let msg_id2 = store.save_message(cid, "assistant", "msg2").await.unwrap();
893
894        let point_id = uuid::Uuid::new_v4().to_string();
895        sqlx::query(
896            "INSERT INTO embeddings_metadata (message_id, qdrant_point_id, dimensions) \
897             VALUES (?, ?, ?)",
898        )
899        .bind(msg_id1)
900        .bind(&point_id)
901        .bind(768_i64)
902        .execute(pool)
903        .await
904        .unwrap();
905
906        let unembedded = store.unembedded_message_ids(None).await.unwrap();
907        assert_eq!(unembedded.len(), 1);
908        assert_eq!(unembedded[0].0, msg_id2);
909        assert_eq!(unembedded[0].3, "msg2");
910    }
911
912    #[tokio::test]
913    async fn unembedded_message_ids_respects_limit() {
914        let store = test_store().await;
915        let cid = store.create_conversation().await.unwrap();
916
917        for i in 0..10 {
918            store
919                .save_message(cid, "user", &format!("msg{i}"))
920                .await
921                .unwrap();
922        }
923
924        let unembedded = store.unembedded_message_ids(Some(3)).await.unwrap();
925        assert_eq!(unembedded.len(), 3);
926    }
927
928    #[tokio::test]
929    async fn count_messages_returns_correct_count() {
930        let store = test_store().await;
931        let cid = store.create_conversation().await.unwrap();
932
933        assert_eq!(store.count_messages(cid).await.unwrap(), 0);
934
935        store.save_message(cid, "user", "msg1").await.unwrap();
936        store.save_message(cid, "assistant", "msg2").await.unwrap();
937
938        assert_eq!(store.count_messages(cid).await.unwrap(), 2);
939    }
940
941    #[tokio::test]
942    async fn count_messages_after_filters_correctly() {
943        let store = test_store().await;
944        let cid = store.create_conversation().await.unwrap();
945
946        let id1 = store.save_message(cid, "user", "msg1").await.unwrap();
947        let _id2 = store.save_message(cid, "assistant", "msg2").await.unwrap();
948        let id3 = store.save_message(cid, "user", "msg3").await.unwrap();
949
950        assert_eq!(
951            store.count_messages_after(cid, MessageId(0)).await.unwrap(),
952            3
953        );
954        assert_eq!(store.count_messages_after(cid, id1).await.unwrap(), 2);
955        assert_eq!(store.count_messages_after(cid, id3).await.unwrap(), 0);
956    }
957
958    #[tokio::test]
959    async fn load_messages_range_basic() {
960        let store = test_store().await;
961        let cid = store.create_conversation().await.unwrap();
962
963        let msg_id1 = store.save_message(cid, "user", "msg1").await.unwrap();
964        let msg_id2 = store.save_message(cid, "assistant", "msg2").await.unwrap();
965        let msg_id3 = store.save_message(cid, "user", "msg3").await.unwrap();
966
967        let msgs = store.load_messages_range(cid, msg_id1, 10).await.unwrap();
968        assert_eq!(msgs.len(), 2);
969        assert_eq!(msgs[0].0, msg_id2);
970        assert_eq!(msgs[0].2, "msg2");
971        assert_eq!(msgs[1].0, msg_id3);
972        assert_eq!(msgs[1].2, "msg3");
973    }
974
975    #[tokio::test]
976    async fn load_messages_range_respects_limit() {
977        let store = test_store().await;
978        let cid = store.create_conversation().await.unwrap();
979
980        store.save_message(cid, "user", "msg1").await.unwrap();
981        store.save_message(cid, "assistant", "msg2").await.unwrap();
982        store.save_message(cid, "user", "msg3").await.unwrap();
983
984        let msgs = store
985            .load_messages_range(cid, MessageId(0), 2)
986            .await
987            .unwrap();
988        assert_eq!(msgs.len(), 2);
989    }
990
991    #[tokio::test]
992    async fn keyword_search_basic() {
993        let store = test_store().await;
994        let cid = store.create_conversation().await.unwrap();
995
996        store
997            .save_message(cid, "user", "rust programming language")
998            .await
999            .unwrap();
1000        store
1001            .save_message(cid, "assistant", "python is great too")
1002            .await
1003            .unwrap();
1004        store
1005            .save_message(cid, "user", "I love rust and cargo")
1006            .await
1007            .unwrap();
1008
1009        let results = store.keyword_search("rust", 10, None).await.unwrap();
1010        assert_eq!(results.len(), 2);
1011        assert!(results.iter().all(|(_, score)| *score > 0.0));
1012    }
1013
1014    #[tokio::test]
1015    async fn keyword_search_with_conversation_filter() {
1016        let store = test_store().await;
1017        let cid1 = store.create_conversation().await.unwrap();
1018        let cid2 = store.create_conversation().await.unwrap();
1019
1020        store
1021            .save_message(cid1, "user", "hello world")
1022            .await
1023            .unwrap();
1024        store
1025            .save_message(cid2, "user", "hello universe")
1026            .await
1027            .unwrap();
1028
1029        let results = store.keyword_search("hello", 10, Some(cid1)).await.unwrap();
1030        assert_eq!(results.len(), 1);
1031    }
1032
1033    #[tokio::test]
1034    async fn keyword_search_no_match() {
1035        let store = test_store().await;
1036        let cid = store.create_conversation().await.unwrap();
1037
1038        store
1039            .save_message(cid, "user", "hello world")
1040            .await
1041            .unwrap();
1042
1043        let results = store.keyword_search("nonexistent", 10, None).await.unwrap();
1044        assert!(results.is_empty());
1045    }
1046
1047    #[tokio::test]
1048    async fn keyword_search_respects_limit() {
1049        let store = test_store().await;
1050        let cid = store.create_conversation().await.unwrap();
1051
1052        for i in 0..10 {
1053            store
1054                .save_message(cid, "user", &format!("test message {i}"))
1055                .await
1056                .unwrap();
1057        }
1058
1059        let results = store.keyword_search("test", 3, None).await.unwrap();
1060        assert_eq!(results.len(), 3);
1061    }
1062
1063    #[test]
1064    fn sanitize_fts5_query_strips_special_chars() {
1065        assert_eq!(sanitize_fts5_query("skill-audit"), "skill audit");
1066        assert_eq!(sanitize_fts5_query("hello, world"), "hello world");
1067        assert_eq!(sanitize_fts5_query("a+b*c^d"), "a b c d");
1068        assert_eq!(sanitize_fts5_query("  "), "");
1069        assert_eq!(sanitize_fts5_query("rust programming"), "rust programming");
1070    }
1071
1072    #[tokio::test]
1073    async fn keyword_search_with_special_chars_does_not_error() {
1074        let store = test_store().await;
1075        let cid = store.create_conversation().await.unwrap();
1076        store
1077            .save_message(cid, "user", "skill audit info")
1078            .await
1079            .unwrap();
1080        // query with comma and special chars — previously caused FTS5 syntax error
1081        // result may be empty; important is that no error is returned
1082        store
1083            .keyword_search("skill-audit, confidence=0.1", 10, None)
1084            .await
1085            .unwrap();
1086    }
1087
1088    #[tokio::test]
1089    async fn save_message_with_metadata_stores_visibility() {
1090        let store = test_store().await;
1091        let cid = store.create_conversation().await.unwrap();
1092
1093        let id = store
1094            .save_message_with_metadata(cid, "user", "hello", "[]", false, true)
1095            .await
1096            .unwrap();
1097
1098        let history = store.load_history(cid, 10).await.unwrap();
1099        assert_eq!(history.len(), 1);
1100        assert!(!history[0].metadata.agent_visible);
1101        assert!(history[0].metadata.user_visible);
1102        assert_eq!(id, MessageId(1));
1103    }
1104
1105    #[tokio::test]
1106    async fn load_history_filtered_by_agent_visible() {
1107        let store = test_store().await;
1108        let cid = store.create_conversation().await.unwrap();
1109
1110        store
1111            .save_message_with_metadata(cid, "user", "visible to agent", "[]", true, true)
1112            .await
1113            .unwrap();
1114        store
1115            .save_message_with_metadata(cid, "user", "user only", "[]", false, true)
1116            .await
1117            .unwrap();
1118
1119        let agent_msgs = store
1120            .load_history_filtered(cid, 50, Some(true), None)
1121            .await
1122            .unwrap();
1123        assert_eq!(agent_msgs.len(), 1);
1124        assert_eq!(agent_msgs[0].content, "visible to agent");
1125    }
1126
1127    #[tokio::test]
1128    async fn load_history_filtered_by_user_visible() {
1129        let store = test_store().await;
1130        let cid = store.create_conversation().await.unwrap();
1131
1132        store
1133            .save_message_with_metadata(cid, "system", "agent only summary", "[]", true, false)
1134            .await
1135            .unwrap();
1136        store
1137            .save_message_with_metadata(cid, "user", "user sees this", "[]", true, true)
1138            .await
1139            .unwrap();
1140
1141        let user_msgs = store
1142            .load_history_filtered(cid, 50, None, Some(true))
1143            .await
1144            .unwrap();
1145        assert_eq!(user_msgs.len(), 1);
1146        assert_eq!(user_msgs[0].content, "user sees this");
1147    }
1148
1149    #[tokio::test]
1150    async fn load_history_filtered_no_filter_returns_all() {
1151        let store = test_store().await;
1152        let cid = store.create_conversation().await.unwrap();
1153
1154        store
1155            .save_message_with_metadata(cid, "user", "msg1", "[]", true, false)
1156            .await
1157            .unwrap();
1158        store
1159            .save_message_with_metadata(cid, "user", "msg2", "[]", false, true)
1160            .await
1161            .unwrap();
1162
1163        let all_msgs = store
1164            .load_history_filtered(cid, 50, None, None)
1165            .await
1166            .unwrap();
1167        assert_eq!(all_msgs.len(), 2);
1168    }
1169
1170    #[tokio::test]
1171    async fn replace_conversation_marks_originals_and_inserts_summary() {
1172        let store = test_store().await;
1173        let cid = store.create_conversation().await.unwrap();
1174
1175        let id1 = store.save_message(cid, "user", "first").await.unwrap();
1176        let id2 = store
1177            .save_message(cid, "assistant", "second")
1178            .await
1179            .unwrap();
1180        let id3 = store.save_message(cid, "user", "third").await.unwrap();
1181
1182        let summary_id = store
1183            .replace_conversation(cid, id1..=id2, "system", "summary text")
1184            .await
1185            .unwrap();
1186
1187        // Original messages should be user_only
1188        let all = store.load_history(cid, 50).await.unwrap();
1189        // id1 and id2 marked agent_visible=false, id3 untouched, summary inserted
1190        let by_id1 = all.iter().find(|m| m.content == "first").unwrap();
1191        assert!(!by_id1.metadata.agent_visible);
1192        assert!(by_id1.metadata.user_visible);
1193
1194        let by_id2 = all.iter().find(|m| m.content == "second").unwrap();
1195        assert!(!by_id2.metadata.agent_visible);
1196
1197        let by_id3 = all.iter().find(|m| m.content == "third").unwrap();
1198        assert!(by_id3.metadata.agent_visible);
1199
1200        // Summary is agent_only (agent_visible=1, user_visible=0)
1201        let summary = all.iter().find(|m| m.content == "summary text").unwrap();
1202        assert!(summary.metadata.agent_visible);
1203        assert!(!summary.metadata.user_visible);
1204        assert!(summary_id > id3);
1205    }
1206
1207    #[tokio::test]
1208    async fn oldest_message_ids_returns_in_order() {
1209        let store = test_store().await;
1210        let cid = store.create_conversation().await.unwrap();
1211
1212        let id1 = store.save_message(cid, "user", "a").await.unwrap();
1213        let id2 = store.save_message(cid, "assistant", "b").await.unwrap();
1214        let id3 = store.save_message(cid, "user", "c").await.unwrap();
1215
1216        let ids = store.oldest_message_ids(cid, 2).await.unwrap();
1217        assert_eq!(ids, vec![id1, id2]);
1218        assert!(ids[0] < ids[1]);
1219
1220        let all_ids = store.oldest_message_ids(cid, 10).await.unwrap();
1221        assert_eq!(all_ids, vec![id1, id2, id3]);
1222    }
1223
1224    #[tokio::test]
1225    async fn message_metadata_default_both_visible() {
1226        let store = test_store().await;
1227        let cid = store.create_conversation().await.unwrap();
1228
1229        store.save_message(cid, "user", "normal").await.unwrap();
1230
1231        let history = store.load_history(cid, 10).await.unwrap();
1232        assert!(history[0].metadata.agent_visible);
1233        assert!(history[0].metadata.user_visible);
1234        assert!(history[0].metadata.compacted_at.is_none());
1235    }
1236
1237    #[tokio::test]
1238    async fn load_history_empty_parts_json_fast_path() {
1239        let store = test_store().await;
1240        let cid = store.create_conversation().await.unwrap();
1241
1242        store
1243            .save_message_with_parts(cid, "user", "hello", "[]")
1244            .await
1245            .unwrap();
1246
1247        let history = store.load_history(cid, 10).await.unwrap();
1248        assert_eq!(history.len(), 1);
1249        assert!(
1250            history[0].parts.is_empty(),
1251            "\"[]\" fast-path must yield empty parts Vec"
1252        );
1253    }
1254
1255    #[tokio::test]
1256    async fn load_history_non_empty_parts_json_parsed() {
1257        let store = test_store().await;
1258        let cid = store.create_conversation().await.unwrap();
1259
1260        let parts_json = serde_json::to_string(&vec![MessagePart::ToolResult {
1261            tool_use_id: "t1".into(),
1262            content: "result".into(),
1263            is_error: false,
1264        }])
1265        .unwrap();
1266
1267        store
1268            .save_message_with_parts(cid, "user", "hello", &parts_json)
1269            .await
1270            .unwrap();
1271
1272        let history = store.load_history(cid, 10).await.unwrap();
1273        assert_eq!(history.len(), 1);
1274        assert_eq!(history[0].parts.len(), 1);
1275        assert!(
1276            matches!(&history[0].parts[0], MessagePart::ToolResult { content, .. } if content == "result")
1277        );
1278    }
1279
1280    #[tokio::test]
1281    async fn message_by_id_empty_parts_json_fast_path() {
1282        let store = test_store().await;
1283        let cid = store.create_conversation().await.unwrap();
1284
1285        let id = store
1286            .save_message_with_parts(cid, "user", "msg", "[]")
1287            .await
1288            .unwrap();
1289
1290        let msg = store.message_by_id(id).await.unwrap().unwrap();
1291        assert!(
1292            msg.parts.is_empty(),
1293            "\"[]\" fast-path must yield empty parts Vec in message_by_id"
1294        );
1295    }
1296
1297    #[tokio::test]
1298    async fn messages_by_ids_empty_parts_json_fast_path() {
1299        let store = test_store().await;
1300        let cid = store.create_conversation().await.unwrap();
1301
1302        let id = store
1303            .save_message_with_parts(cid, "user", "msg", "[]")
1304            .await
1305            .unwrap();
1306
1307        let results = store.messages_by_ids(&[id]).await.unwrap();
1308        assert_eq!(results.len(), 1);
1309        assert!(
1310            results[0].1.parts.is_empty(),
1311            "\"[]\" fast-path must yield empty parts Vec in messages_by_ids"
1312        );
1313    }
1314
1315    #[tokio::test]
1316    async fn load_history_filtered_empty_parts_json_fast_path() {
1317        let store = test_store().await;
1318        let cid = store.create_conversation().await.unwrap();
1319
1320        store
1321            .save_message_with_metadata(cid, "user", "msg", "[]", true, true)
1322            .await
1323            .unwrap();
1324
1325        let msgs = store
1326            .load_history_filtered(cid, 10, Some(true), None)
1327            .await
1328            .unwrap();
1329        assert_eq!(msgs.len(), 1);
1330        assert!(
1331            msgs[0].parts.is_empty(),
1332            "\"[]\" fast-path must yield empty parts Vec in load_history_filtered"
1333        );
1334    }
1335}