Skip to main content

zeph_memory/sqlite/
messages.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use zeph_llm::provider::{Message, MessageMetadata, MessagePart, Role};
5
6use super::SqliteStore;
7use crate::error::MemoryError;
8use crate::types::{ConversationId, MessageId};
9
10/// Sanitize an arbitrary string into a valid FTS5 query.
11///
12/// Extracts alphanumeric tokens and joins them with spaces. FTS5 special
13/// characters (`,`, `"`, `*`, `(`, `)`, `^`, `-`, `+`) are stripped to avoid
14/// syntax errors in the `MATCH` clause.
15fn sanitize_fts5_query(query: &str) -> String {
16    query
17        .split(|c: char| !c.is_alphanumeric())
18        .filter(|t| !t.is_empty())
19        .collect::<Vec<_>>()
20        .join(" ")
21}
22
23fn parse_role(s: &str) -> Role {
24    match s {
25        "assistant" => Role::Assistant,
26        "system" => Role::System,
27        _ => Role::User,
28    }
29}
30
31#[must_use]
32pub fn role_str(role: Role) -> &'static str {
33    match role {
34        Role::System => "system",
35        Role::User => "user",
36        Role::Assistant => "assistant",
37    }
38}
39
40impl SqliteStore {
41    /// Create a new conversation and return its ID.
42    ///
43    /// # Errors
44    ///
45    /// Returns an error if the insert fails.
46    pub async fn create_conversation(&self) -> Result<ConversationId, MemoryError> {
47        let row: (ConversationId,) =
48            sqlx::query_as("INSERT INTO conversations DEFAULT VALUES RETURNING id")
49                .fetch_one(&self.pool)
50                .await?;
51        Ok(row.0)
52    }
53
54    /// Save a message to the given conversation and return the message ID.
55    ///
56    /// # Errors
57    ///
58    /// Returns an error if the insert fails.
59    pub async fn save_message(
60        &self,
61        conversation_id: ConversationId,
62        role: &str,
63        content: &str,
64    ) -> Result<MessageId, MemoryError> {
65        self.save_message_with_parts(conversation_id, role, content, "[]")
66            .await
67    }
68
69    /// Save a message with structured parts JSON.
70    ///
71    /// # Errors
72    ///
73    /// Returns an error if the insert fails.
74    pub async fn save_message_with_parts(
75        &self,
76        conversation_id: ConversationId,
77        role: &str,
78        content: &str,
79        parts_json: &str,
80    ) -> Result<MessageId, MemoryError> {
81        self.save_message_with_metadata(conversation_id, role, content, parts_json, true, true)
82            .await
83    }
84
85    /// Save a message with visibility metadata.
86    ///
87    /// # Errors
88    ///
89    /// Returns an error if the insert fails.
90    pub async fn save_message_with_metadata(
91        &self,
92        conversation_id: ConversationId,
93        role: &str,
94        content: &str,
95        parts_json: &str,
96        agent_visible: bool,
97        user_visible: bool,
98    ) -> Result<MessageId, MemoryError> {
99        let row: (MessageId,) = sqlx::query_as(
100            "INSERT INTO messages (conversation_id, role, content, parts, agent_visible, user_visible) \
101             VALUES (?, ?, ?, ?, ?, ?) RETURNING id",
102        )
103        .bind(conversation_id)
104        .bind(role)
105        .bind(content)
106        .bind(parts_json)
107        .bind(i64::from(agent_visible))
108        .bind(i64::from(user_visible))
109        .fetch_one(&self.pool)
110        .await?;
111        Ok(row.0)
112    }
113
114    /// Load the most recent messages for a conversation, up to `limit`.
115    ///
116    /// # Errors
117    ///
118    /// Returns an error if the query fails.
119    pub async fn load_history(
120        &self,
121        conversation_id: ConversationId,
122        limit: u32,
123    ) -> Result<Vec<Message>, MemoryError> {
124        let rows: Vec<(String, String, String, i64, i64)> = sqlx::query_as(
125            "SELECT role, content, parts, agent_visible, user_visible FROM (\
126                SELECT role, content, parts, agent_visible, user_visible, id FROM messages \
127                WHERE conversation_id = ? AND deleted_at IS NULL \
128                ORDER BY id DESC \
129                LIMIT ?\
130             ) ORDER BY id ASC",
131        )
132        .bind(conversation_id)
133        .bind(limit)
134        .fetch_all(&self.pool)
135        .await?;
136
137        let messages = rows
138            .into_iter()
139            .map(
140                |(role_str, content, parts_json, agent_visible, user_visible)| {
141                    let parts: Vec<MessagePart> = if parts_json == "[]" {
142                        vec![]
143                    } else {
144                        serde_json::from_str(&parts_json).unwrap_or_default()
145                    };
146                    Message {
147                        role: parse_role(&role_str),
148                        content,
149                        parts,
150                        metadata: MessageMetadata {
151                            agent_visible: agent_visible != 0,
152                            user_visible: user_visible != 0,
153                            compacted_at: None,
154                        },
155                    }
156                },
157            )
158            .collect();
159        Ok(messages)
160    }
161
162    /// Load messages filtered by visibility flags.
163    ///
164    /// Pass `Some(true)` to filter by a flag, `None` to skip filtering.
165    ///
166    /// # Errors
167    ///
168    /// Returns an error if the query fails.
169    pub async fn load_history_filtered(
170        &self,
171        conversation_id: ConversationId,
172        limit: u32,
173        agent_visible: Option<bool>,
174        user_visible: Option<bool>,
175    ) -> Result<Vec<Message>, MemoryError> {
176        let av = agent_visible.map(i64::from);
177        let uv = user_visible.map(i64::from);
178
179        let rows: Vec<(String, String, String, i64, i64)> = sqlx::query_as(
180            "WITH recent AS (\
181                SELECT role, content, parts, agent_visible, user_visible, id FROM messages \
182                WHERE conversation_id = ? \
183                  AND deleted_at IS NULL \
184                  AND (? IS NULL OR agent_visible = ?) \
185                  AND (? IS NULL OR user_visible = ?) \
186                ORDER BY id DESC \
187                LIMIT ?\
188             ) SELECT role, content, parts, agent_visible, user_visible FROM recent ORDER BY id ASC",
189        )
190        .bind(conversation_id)
191        .bind(av)
192        .bind(av)
193        .bind(uv)
194        .bind(uv)
195        .bind(limit)
196        .fetch_all(&self.pool)
197        .await?;
198
199        let messages = rows
200            .into_iter()
201            .map(
202                |(role_str, content, parts_json, agent_visible, user_visible)| {
203                    let parts: Vec<MessagePart> = if parts_json == "[]" {
204                        vec![]
205                    } else {
206                        serde_json::from_str(&parts_json).unwrap_or_default()
207                    };
208                    Message {
209                        role: parse_role(&role_str),
210                        content,
211                        parts,
212                        metadata: MessageMetadata {
213                            agent_visible: agent_visible != 0,
214                            user_visible: user_visible != 0,
215                            compacted_at: None,
216                        },
217                    }
218                },
219            )
220            .collect();
221        Ok(messages)
222    }
223
224    /// Atomically mark a range of messages as user-only and insert a summary as agent-only.
225    ///
226    /// Within a single transaction:
227    /// 1. Updates `agent_visible=0, compacted_at=now` for messages in `compacted_range`.
228    /// 2. Inserts `summary_content` with `agent_visible=1, user_visible=0`.
229    ///
230    /// Returns the `MessageId` of the inserted summary.
231    ///
232    /// # Errors
233    ///
234    /// Returns an error if the transaction fails.
235    pub async fn replace_conversation(
236        &self,
237        conversation_id: ConversationId,
238        compacted_range: std::ops::RangeInclusive<MessageId>,
239        summary_role: &str,
240        summary_content: &str,
241    ) -> Result<MessageId, MemoryError> {
242        let now = {
243            let secs = std::time::SystemTime::now()
244                .duration_since(std::time::UNIX_EPOCH)
245                .unwrap_or_default()
246                .as_secs();
247            format!("{secs}")
248        };
249        let start_id = compacted_range.start().0;
250        let end_id = compacted_range.end().0;
251
252        let mut tx = self.pool.begin().await?;
253
254        sqlx::query(
255            "UPDATE messages SET agent_visible = 0, compacted_at = ? \
256             WHERE conversation_id = ? AND id >= ? AND id <= ?",
257        )
258        .bind(&now)
259        .bind(conversation_id)
260        .bind(start_id)
261        .bind(end_id)
262        .execute(&mut *tx)
263        .await?;
264
265        let row: (MessageId,) = sqlx::query_as(
266            "INSERT INTO messages \
267             (conversation_id, role, content, parts, agent_visible, user_visible) \
268             VALUES (?, ?, ?, '[]', 1, 0) RETURNING id",
269        )
270        .bind(conversation_id)
271        .bind(summary_role)
272        .bind(summary_content)
273        .fetch_one(&mut *tx)
274        .await?;
275
276        tx.commit().await?;
277
278        Ok(row.0)
279    }
280
281    /// Return the IDs of the N oldest messages in a conversation (ascending order).
282    ///
283    /// # Errors
284    ///
285    /// Returns an error if the query fails.
286    pub async fn oldest_message_ids(
287        &self,
288        conversation_id: ConversationId,
289        n: u32,
290    ) -> Result<Vec<MessageId>, MemoryError> {
291        let rows: Vec<(MessageId,)> = sqlx::query_as(
292            "SELECT id FROM messages WHERE conversation_id = ? AND deleted_at IS NULL ORDER BY id ASC LIMIT ?",
293        )
294        .bind(conversation_id)
295        .bind(n)
296        .fetch_all(&self.pool)
297        .await?;
298        Ok(rows.into_iter().map(|r| r.0).collect())
299    }
300
301    /// Return the ID of the most recent conversation, if any.
302    ///
303    /// # Errors
304    ///
305    /// Returns an error if the query fails.
306    pub async fn latest_conversation_id(&self) -> Result<Option<ConversationId>, MemoryError> {
307        let row: Option<(ConversationId,)> =
308            sqlx::query_as("SELECT id FROM conversations ORDER BY id DESC LIMIT 1")
309                .fetch_optional(&self.pool)
310                .await?;
311        Ok(row.map(|r| r.0))
312    }
313
314    /// Fetch a single message by its ID.
315    ///
316    /// # Errors
317    ///
318    /// Returns an error if the query fails.
319    pub async fn message_by_id(
320        &self,
321        message_id: MessageId,
322    ) -> Result<Option<Message>, MemoryError> {
323        let row: Option<(String, String, String, i64, i64)> = sqlx::query_as(
324            "SELECT role, content, parts, agent_visible, user_visible FROM messages WHERE id = ? AND deleted_at IS NULL",
325        )
326        .bind(message_id)
327        .fetch_optional(&self.pool)
328        .await?;
329
330        Ok(row.map(
331            |(role_str, content, parts_json, agent_visible, user_visible)| {
332                let parts: Vec<MessagePart> = if parts_json == "[]" {
333                    vec![]
334                } else {
335                    serde_json::from_str(&parts_json).unwrap_or_default()
336                };
337                Message {
338                    role: parse_role(&role_str),
339                    content,
340                    parts,
341                    metadata: MessageMetadata {
342                        agent_visible: agent_visible != 0,
343                        user_visible: user_visible != 0,
344                        compacted_at: None,
345                    },
346                }
347            },
348        ))
349    }
350
351    /// Fetch messages by a list of IDs in a single query.
352    ///
353    /// # Errors
354    ///
355    /// Returns an error if the query fails.
356    pub async fn messages_by_ids(
357        &self,
358        ids: &[MessageId],
359    ) -> Result<Vec<(MessageId, Message)>, MemoryError> {
360        if ids.is_empty() {
361            return Ok(Vec::new());
362        }
363
364        let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
365
366        let query = format!(
367            "SELECT id, role, content, parts FROM messages \
368             WHERE id IN ({placeholders}) AND agent_visible = 1 AND deleted_at IS NULL"
369        );
370        let mut q = sqlx::query_as::<_, (MessageId, String, String, String)>(&query);
371        for &id in ids {
372            q = q.bind(id);
373        }
374
375        let rows = q.fetch_all(&self.pool).await?;
376
377        Ok(rows
378            .into_iter()
379            .map(|(id, role_str, content, parts_json)| {
380                let parts: Vec<MessagePart> = if parts_json == "[]" {
381                    vec![]
382                } else {
383                    serde_json::from_str(&parts_json).unwrap_or_default()
384                };
385                (
386                    id,
387                    Message {
388                        role: parse_role(&role_str),
389                        content,
390                        parts,
391                        metadata: MessageMetadata::default(),
392                    },
393                )
394            })
395            .collect())
396    }
397
398    /// Return message IDs and content for messages without embeddings.
399    ///
400    /// # Errors
401    ///
402    /// Returns an error if the query fails.
403    pub async fn unembedded_message_ids(
404        &self,
405        limit: Option<usize>,
406    ) -> Result<Vec<(MessageId, ConversationId, String, String)>, MemoryError> {
407        let effective_limit = limit.map_or(i64::MAX, |l| i64::try_from(l).unwrap_or(i64::MAX));
408
409        let rows: Vec<(MessageId, ConversationId, String, String)> = sqlx::query_as(
410            "SELECT m.id, m.conversation_id, m.role, m.content \
411             FROM messages m \
412             LEFT JOIN embeddings_metadata em ON m.id = em.message_id \
413             WHERE em.id IS NULL AND m.deleted_at IS NULL \
414             ORDER BY m.id ASC \
415             LIMIT ?",
416        )
417        .bind(effective_limit)
418        .fetch_all(&self.pool)
419        .await?;
420
421        Ok(rows)
422    }
423
424    /// Count the number of messages in a conversation.
425    ///
426    /// # Errors
427    ///
428    /// Returns an error if the query fails.
429    pub async fn count_messages(
430        &self,
431        conversation_id: ConversationId,
432    ) -> Result<i64, MemoryError> {
433        let row: (i64,) = sqlx::query_as(
434            "SELECT COUNT(*) FROM messages WHERE conversation_id = ? AND deleted_at IS NULL",
435        )
436        .bind(conversation_id)
437        .fetch_one(&self.pool)
438        .await?;
439        Ok(row.0)
440    }
441
442    /// Count messages in a conversation with id greater than `after_id`.
443    ///
444    /// # Errors
445    ///
446    /// Returns an error if the query fails.
447    pub async fn count_messages_after(
448        &self,
449        conversation_id: ConversationId,
450        after_id: MessageId,
451    ) -> Result<i64, MemoryError> {
452        let row: (i64,) =
453            sqlx::query_as(
454                "SELECT COUNT(*) FROM messages WHERE conversation_id = ? AND id > ? AND deleted_at IS NULL",
455            )
456            .bind(conversation_id)
457            .bind(after_id)
458            .fetch_one(&self.pool)
459            .await?;
460        Ok(row.0)
461    }
462
463    /// Full-text keyword search over messages using FTS5.
464    ///
465    /// Returns message IDs with BM25 relevance scores (lower = more relevant,
466    /// negated to positive for consistency with vector scores).
467    ///
468    /// # Errors
469    ///
470    /// Returns an error if the query fails.
471    pub async fn keyword_search(
472        &self,
473        query: &str,
474        limit: usize,
475        conversation_id: Option<ConversationId>,
476    ) -> Result<Vec<(MessageId, f64)>, MemoryError> {
477        let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
478        let safe_query = sanitize_fts5_query(query);
479        if safe_query.is_empty() {
480            return Ok(Vec::new());
481        }
482
483        let rows: Vec<(MessageId, f64)> = if let Some(cid) = conversation_id {
484            sqlx::query_as(
485                "SELECT m.id, -rank AS score \
486                 FROM messages_fts f \
487                 JOIN messages m ON m.id = f.rowid \
488                 WHERE messages_fts MATCH ? AND m.conversation_id = ? AND m.agent_visible = 1 AND m.deleted_at IS NULL \
489                 ORDER BY rank \
490                 LIMIT ?",
491            )
492            .bind(&safe_query)
493            .bind(cid)
494            .bind(effective_limit)
495            .fetch_all(&self.pool)
496            .await?
497        } else {
498            sqlx::query_as(
499                "SELECT m.id, -rank AS score \
500                 FROM messages_fts f \
501                 JOIN messages m ON m.id = f.rowid \
502                 WHERE messages_fts MATCH ? AND m.agent_visible = 1 AND m.deleted_at IS NULL \
503                 ORDER BY rank \
504                 LIMIT ?",
505            )
506            .bind(&safe_query)
507            .bind(effective_limit)
508            .fetch_all(&self.pool)
509            .await?
510        };
511
512        Ok(rows)
513    }
514
515    /// Fetch creation timestamps (Unix epoch seconds) for the given message IDs.
516    ///
517    /// Messages without a `created_at` column fall back to 0.
518    ///
519    /// # Errors
520    ///
521    /// Returns an error if the query fails.
522    pub async fn message_timestamps(
523        &self,
524        ids: &[MessageId],
525    ) -> Result<std::collections::HashMap<MessageId, i64>, MemoryError> {
526        if ids.is_empty() {
527            return Ok(std::collections::HashMap::new());
528        }
529
530        let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
531        let query = format!(
532            "SELECT id, COALESCE(CAST(strftime('%s', created_at) AS INTEGER), 0) \
533             FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
534        );
535        let mut q = sqlx::query_as::<_, (MessageId, i64)>(&query);
536        for &id in ids {
537            q = q.bind(id);
538        }
539
540        let rows = q.fetch_all(&self.pool).await?;
541        Ok(rows.into_iter().collect())
542    }
543
544    /// Load a range of messages after a given message ID.
545    ///
546    /// # Errors
547    ///
548    /// Returns an error if the query fails.
549    pub async fn load_messages_range(
550        &self,
551        conversation_id: ConversationId,
552        after_message_id: MessageId,
553        limit: usize,
554    ) -> Result<Vec<(MessageId, String, String)>, MemoryError> {
555        let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
556
557        let rows: Vec<(MessageId, String, String)> = sqlx::query_as(
558            "SELECT id, role, content FROM messages \
559             WHERE conversation_id = ? AND id > ? AND deleted_at IS NULL \
560             ORDER BY id ASC LIMIT ?",
561        )
562        .bind(conversation_id)
563        .bind(after_message_id)
564        .bind(effective_limit)
565        .fetch_all(&self.pool)
566        .await?;
567
568        Ok(rows)
569    }
570
571    // ── Eviction helpers ──────────────────────────────────────────────────────
572
573    /// Return all non-deleted message IDs with their eviction metadata.
574    ///
575    /// # Errors
576    ///
577    /// Returns an error if the query fails.
578    pub async fn get_eviction_candidates(
579        &self,
580    ) -> Result<Vec<crate::eviction::EvictionEntry>, crate::error::MemoryError> {
581        let rows: Vec<(MessageId, String, Option<String>, i64)> = sqlx::query_as(
582            "SELECT id, created_at, last_accessed, access_count \
583             FROM messages WHERE deleted_at IS NULL",
584        )
585        .fetch_all(&self.pool)
586        .await?;
587
588        Ok(rows
589            .into_iter()
590            .map(
591                |(id, created_at, last_accessed, access_count)| crate::eviction::EvictionEntry {
592                    id,
593                    created_at,
594                    last_accessed,
595                    access_count: access_count.try_into().unwrap_or(0),
596                },
597            )
598            .collect())
599    }
600
601    /// Soft-delete a set of messages by marking `deleted_at`.
602    ///
603    /// Soft-deleted messages are excluded from all history queries.
604    ///
605    /// # Errors
606    ///
607    /// Returns an error if the update fails.
608    pub async fn soft_delete_messages(
609        &self,
610        ids: &[MessageId],
611    ) -> Result<(), crate::error::MemoryError> {
612        if ids.is_empty() {
613            return Ok(());
614        }
615        // SQLite does not support array binding natively. Batch via individual updates.
616        for &id in ids {
617            sqlx::query(
618                "UPDATE messages SET deleted_at = datetime('now') WHERE id = ? AND deleted_at IS NULL",
619            )
620            .bind(id)
621            .execute(&self.pool)
622            .await?;
623        }
624        Ok(())
625    }
626
627    /// Return IDs of soft-deleted messages that have not yet been cleaned from Qdrant.
628    ///
629    /// # Errors
630    ///
631    /// Returns an error if the query fails.
632    pub async fn get_soft_deleted_message_ids(
633        &self,
634    ) -> Result<Vec<MessageId>, crate::error::MemoryError> {
635        let rows: Vec<(MessageId,)> = sqlx::query_as(
636            "SELECT id FROM messages WHERE deleted_at IS NOT NULL AND qdrant_cleaned = 0",
637        )
638        .fetch_all(&self.pool)
639        .await?;
640        Ok(rows.into_iter().map(|(id,)| id).collect())
641    }
642
643    /// Mark a set of soft-deleted messages as Qdrant-cleaned.
644    ///
645    /// # Errors
646    ///
647    /// Returns an error if the update fails.
648    pub async fn mark_qdrant_cleaned(
649        &self,
650        ids: &[MessageId],
651    ) -> Result<(), crate::error::MemoryError> {
652        for &id in ids {
653            sqlx::query("UPDATE messages SET qdrant_cleaned = 1 WHERE id = ?")
654                .bind(id)
655                .execute(&self.pool)
656                .await?;
657        }
658        Ok(())
659    }
660}
661
662#[cfg(test)]
663mod tests {
664    use super::*;
665
666    async fn test_store() -> SqliteStore {
667        SqliteStore::new(":memory:").await.unwrap()
668    }
669
670    #[tokio::test]
671    async fn create_conversation_returns_id() {
672        let store = test_store().await;
673        let id1 = store.create_conversation().await.unwrap();
674        let id2 = store.create_conversation().await.unwrap();
675        assert_eq!(id1, ConversationId(1));
676        assert_eq!(id2, ConversationId(2));
677    }
678
679    #[tokio::test]
680    async fn save_and_load_messages() {
681        let store = test_store().await;
682        let cid = store.create_conversation().await.unwrap();
683
684        let msg_id1 = store.save_message(cid, "user", "hello").await.unwrap();
685        let msg_id2 = store
686            .save_message(cid, "assistant", "hi there")
687            .await
688            .unwrap();
689
690        assert_eq!(msg_id1, MessageId(1));
691        assert_eq!(msg_id2, MessageId(2));
692
693        let history = store.load_history(cid, 50).await.unwrap();
694        assert_eq!(history.len(), 2);
695        assert_eq!(history[0].role, Role::User);
696        assert_eq!(history[0].content, "hello");
697        assert_eq!(history[1].role, Role::Assistant);
698        assert_eq!(history[1].content, "hi there");
699    }
700
701    #[tokio::test]
702    async fn load_history_respects_limit() {
703        let store = test_store().await;
704        let cid = store.create_conversation().await.unwrap();
705
706        for i in 0..10 {
707            store
708                .save_message(cid, "user", &format!("msg {i}"))
709                .await
710                .unwrap();
711        }
712
713        let history = store.load_history(cid, 3).await.unwrap();
714        assert_eq!(history.len(), 3);
715        assert_eq!(history[0].content, "msg 7");
716        assert_eq!(history[1].content, "msg 8");
717        assert_eq!(history[2].content, "msg 9");
718    }
719
720    #[tokio::test]
721    async fn latest_conversation_id_empty() {
722        let store = test_store().await;
723        assert!(store.latest_conversation_id().await.unwrap().is_none());
724    }
725
726    #[tokio::test]
727    async fn latest_conversation_id_returns_newest() {
728        let store = test_store().await;
729        store.create_conversation().await.unwrap();
730        let id2 = store.create_conversation().await.unwrap();
731        assert_eq!(store.latest_conversation_id().await.unwrap(), Some(id2));
732    }
733
734    #[tokio::test]
735    async fn messages_isolated_per_conversation() {
736        let store = test_store().await;
737        let cid1 = store.create_conversation().await.unwrap();
738        let cid2 = store.create_conversation().await.unwrap();
739
740        store.save_message(cid1, "user", "conv1").await.unwrap();
741        store.save_message(cid2, "user", "conv2").await.unwrap();
742
743        let h1 = store.load_history(cid1, 50).await.unwrap();
744        let h2 = store.load_history(cid2, 50).await.unwrap();
745        assert_eq!(h1.len(), 1);
746        assert_eq!(h1[0].content, "conv1");
747        assert_eq!(h2.len(), 1);
748        assert_eq!(h2[0].content, "conv2");
749    }
750
751    #[tokio::test]
752    async fn pool_accessor_returns_valid_pool() {
753        let store = test_store().await;
754        let pool = store.pool();
755        let row: (i64,) = sqlx::query_as("SELECT 1").fetch_one(pool).await.unwrap();
756        assert_eq!(row.0, 1);
757    }
758
759    #[tokio::test]
760    async fn embeddings_metadata_table_exists() {
761        let store = test_store().await;
762        let result: (i64,) = sqlx::query_as(
763            "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='embeddings_metadata'",
764        )
765        .fetch_one(store.pool())
766        .await
767        .unwrap();
768        assert_eq!(result.0, 1);
769    }
770
771    #[tokio::test]
772    async fn cascade_delete_removes_embeddings_metadata() {
773        let store = test_store().await;
774        let pool = store.pool();
775
776        let cid = store.create_conversation().await.unwrap();
777        let msg_id = store.save_message(cid, "user", "test").await.unwrap();
778
779        let point_id = uuid::Uuid::new_v4().to_string();
780        sqlx::query(
781            "INSERT INTO embeddings_metadata (message_id, qdrant_point_id, dimensions) \
782             VALUES (?, ?, ?)",
783        )
784        .bind(msg_id)
785        .bind(&point_id)
786        .bind(768_i64)
787        .execute(pool)
788        .await
789        .unwrap();
790
791        let before: (i64,) =
792            sqlx::query_as("SELECT COUNT(*) FROM embeddings_metadata WHERE message_id = ?")
793                .bind(msg_id)
794                .fetch_one(pool)
795                .await
796                .unwrap();
797        assert_eq!(before.0, 1);
798
799        sqlx::query("DELETE FROM messages WHERE id = ?")
800            .bind(msg_id)
801            .execute(pool)
802            .await
803            .unwrap();
804
805        let after: (i64,) =
806            sqlx::query_as("SELECT COUNT(*) FROM embeddings_metadata WHERE message_id = ?")
807                .bind(msg_id)
808                .fetch_one(pool)
809                .await
810                .unwrap();
811        assert_eq!(after.0, 0);
812    }
813
814    #[tokio::test]
815    async fn messages_by_ids_batch_fetch() {
816        let store = test_store().await;
817        let cid = store.create_conversation().await.unwrap();
818        let id1 = store.save_message(cid, "user", "hello").await.unwrap();
819        let id2 = store.save_message(cid, "assistant", "hi").await.unwrap();
820        let _id3 = store.save_message(cid, "user", "bye").await.unwrap();
821
822        let results = store.messages_by_ids(&[id1, id2]).await.unwrap();
823        assert_eq!(results.len(), 2);
824        assert_eq!(results[0].0, id1);
825        assert_eq!(results[0].1.content, "hello");
826        assert_eq!(results[1].0, id2);
827        assert_eq!(results[1].1.content, "hi");
828    }
829
830    #[tokio::test]
831    async fn messages_by_ids_empty_input() {
832        let store = test_store().await;
833        let results = store.messages_by_ids(&[]).await.unwrap();
834        assert!(results.is_empty());
835    }
836
837    #[tokio::test]
838    async fn messages_by_ids_nonexistent() {
839        let store = test_store().await;
840        let results = store
841            .messages_by_ids(&[MessageId(999), MessageId(1000)])
842            .await
843            .unwrap();
844        assert!(results.is_empty());
845    }
846
847    #[tokio::test]
848    async fn message_by_id_fetches_existing() {
849        let store = test_store().await;
850        let cid = store.create_conversation().await.unwrap();
851        let msg_id = store.save_message(cid, "user", "hello").await.unwrap();
852
853        let msg = store.message_by_id(msg_id).await.unwrap();
854        assert!(msg.is_some());
855        let msg = msg.unwrap();
856        assert_eq!(msg.role, Role::User);
857        assert_eq!(msg.content, "hello");
858    }
859
860    #[tokio::test]
861    async fn message_by_id_returns_none_for_nonexistent() {
862        let store = test_store().await;
863        let msg = store.message_by_id(MessageId(999)).await.unwrap();
864        assert!(msg.is_none());
865    }
866
867    #[tokio::test]
868    async fn unembedded_message_ids_returns_all_when_none_embedded() {
869        let store = test_store().await;
870        let cid = store.create_conversation().await.unwrap();
871
872        store.save_message(cid, "user", "msg1").await.unwrap();
873        store.save_message(cid, "assistant", "msg2").await.unwrap();
874
875        let unembedded = store.unembedded_message_ids(None).await.unwrap();
876        assert_eq!(unembedded.len(), 2);
877        assert_eq!(unembedded[0].3, "msg1");
878        assert_eq!(unembedded[1].3, "msg2");
879    }
880
881    #[tokio::test]
882    async fn unembedded_message_ids_excludes_embedded() {
883        let store = test_store().await;
884        let pool = store.pool();
885        let cid = store.create_conversation().await.unwrap();
886
887        let msg_id1 = store.save_message(cid, "user", "msg1").await.unwrap();
888        let msg_id2 = store.save_message(cid, "assistant", "msg2").await.unwrap();
889
890        let point_id = uuid::Uuid::new_v4().to_string();
891        sqlx::query(
892            "INSERT INTO embeddings_metadata (message_id, qdrant_point_id, dimensions) \
893             VALUES (?, ?, ?)",
894        )
895        .bind(msg_id1)
896        .bind(&point_id)
897        .bind(768_i64)
898        .execute(pool)
899        .await
900        .unwrap();
901
902        let unembedded = store.unembedded_message_ids(None).await.unwrap();
903        assert_eq!(unembedded.len(), 1);
904        assert_eq!(unembedded[0].0, msg_id2);
905        assert_eq!(unembedded[0].3, "msg2");
906    }
907
908    #[tokio::test]
909    async fn unembedded_message_ids_respects_limit() {
910        let store = test_store().await;
911        let cid = store.create_conversation().await.unwrap();
912
913        for i in 0..10 {
914            store
915                .save_message(cid, "user", &format!("msg{i}"))
916                .await
917                .unwrap();
918        }
919
920        let unembedded = store.unembedded_message_ids(Some(3)).await.unwrap();
921        assert_eq!(unembedded.len(), 3);
922    }
923
924    #[tokio::test]
925    async fn count_messages_returns_correct_count() {
926        let store = test_store().await;
927        let cid = store.create_conversation().await.unwrap();
928
929        assert_eq!(store.count_messages(cid).await.unwrap(), 0);
930
931        store.save_message(cid, "user", "msg1").await.unwrap();
932        store.save_message(cid, "assistant", "msg2").await.unwrap();
933
934        assert_eq!(store.count_messages(cid).await.unwrap(), 2);
935    }
936
937    #[tokio::test]
938    async fn count_messages_after_filters_correctly() {
939        let store = test_store().await;
940        let cid = store.create_conversation().await.unwrap();
941
942        let id1 = store.save_message(cid, "user", "msg1").await.unwrap();
943        let _id2 = store.save_message(cid, "assistant", "msg2").await.unwrap();
944        let id3 = store.save_message(cid, "user", "msg3").await.unwrap();
945
946        assert_eq!(
947            store.count_messages_after(cid, MessageId(0)).await.unwrap(),
948            3
949        );
950        assert_eq!(store.count_messages_after(cid, id1).await.unwrap(), 2);
951        assert_eq!(store.count_messages_after(cid, id3).await.unwrap(), 0);
952    }
953
954    #[tokio::test]
955    async fn load_messages_range_basic() {
956        let store = test_store().await;
957        let cid = store.create_conversation().await.unwrap();
958
959        let msg_id1 = store.save_message(cid, "user", "msg1").await.unwrap();
960        let msg_id2 = store.save_message(cid, "assistant", "msg2").await.unwrap();
961        let msg_id3 = store.save_message(cid, "user", "msg3").await.unwrap();
962
963        let msgs = store.load_messages_range(cid, msg_id1, 10).await.unwrap();
964        assert_eq!(msgs.len(), 2);
965        assert_eq!(msgs[0].0, msg_id2);
966        assert_eq!(msgs[0].2, "msg2");
967        assert_eq!(msgs[1].0, msg_id3);
968        assert_eq!(msgs[1].2, "msg3");
969    }
970
971    #[tokio::test]
972    async fn load_messages_range_respects_limit() {
973        let store = test_store().await;
974        let cid = store.create_conversation().await.unwrap();
975
976        store.save_message(cid, "user", "msg1").await.unwrap();
977        store.save_message(cid, "assistant", "msg2").await.unwrap();
978        store.save_message(cid, "user", "msg3").await.unwrap();
979
980        let msgs = store
981            .load_messages_range(cid, MessageId(0), 2)
982            .await
983            .unwrap();
984        assert_eq!(msgs.len(), 2);
985    }
986
987    #[tokio::test]
988    async fn keyword_search_basic() {
989        let store = test_store().await;
990        let cid = store.create_conversation().await.unwrap();
991
992        store
993            .save_message(cid, "user", "rust programming language")
994            .await
995            .unwrap();
996        store
997            .save_message(cid, "assistant", "python is great too")
998            .await
999            .unwrap();
1000        store
1001            .save_message(cid, "user", "I love rust and cargo")
1002            .await
1003            .unwrap();
1004
1005        let results = store.keyword_search("rust", 10, None).await.unwrap();
1006        assert_eq!(results.len(), 2);
1007        assert!(results.iter().all(|(_, score)| *score > 0.0));
1008    }
1009
1010    #[tokio::test]
1011    async fn keyword_search_with_conversation_filter() {
1012        let store = test_store().await;
1013        let cid1 = store.create_conversation().await.unwrap();
1014        let cid2 = store.create_conversation().await.unwrap();
1015
1016        store
1017            .save_message(cid1, "user", "hello world")
1018            .await
1019            .unwrap();
1020        store
1021            .save_message(cid2, "user", "hello universe")
1022            .await
1023            .unwrap();
1024
1025        let results = store.keyword_search("hello", 10, Some(cid1)).await.unwrap();
1026        assert_eq!(results.len(), 1);
1027    }
1028
1029    #[tokio::test]
1030    async fn keyword_search_no_match() {
1031        let store = test_store().await;
1032        let cid = store.create_conversation().await.unwrap();
1033
1034        store
1035            .save_message(cid, "user", "hello world")
1036            .await
1037            .unwrap();
1038
1039        let results = store.keyword_search("nonexistent", 10, None).await.unwrap();
1040        assert!(results.is_empty());
1041    }
1042
1043    #[tokio::test]
1044    async fn keyword_search_respects_limit() {
1045        let store = test_store().await;
1046        let cid = store.create_conversation().await.unwrap();
1047
1048        for i in 0..10 {
1049            store
1050                .save_message(cid, "user", &format!("test message {i}"))
1051                .await
1052                .unwrap();
1053        }
1054
1055        let results = store.keyword_search("test", 3, None).await.unwrap();
1056        assert_eq!(results.len(), 3);
1057    }
1058
1059    #[test]
1060    fn sanitize_fts5_query_strips_special_chars() {
1061        assert_eq!(sanitize_fts5_query("skill-audit"), "skill audit");
1062        assert_eq!(sanitize_fts5_query("hello, world"), "hello world");
1063        assert_eq!(sanitize_fts5_query("a+b*c^d"), "a b c d");
1064        assert_eq!(sanitize_fts5_query("  "), "");
1065        assert_eq!(sanitize_fts5_query("rust programming"), "rust programming");
1066    }
1067
1068    #[tokio::test]
1069    async fn keyword_search_with_special_chars_does_not_error() {
1070        let store = test_store().await;
1071        let cid = store.create_conversation().await.unwrap();
1072        store
1073            .save_message(cid, "user", "skill audit info")
1074            .await
1075            .unwrap();
1076        // query with comma and special chars — previously caused FTS5 syntax error
1077        // result may be empty; important is that no error is returned
1078        store
1079            .keyword_search("skill-audit, confidence=0.1", 10, None)
1080            .await
1081            .unwrap();
1082    }
1083
1084    #[tokio::test]
1085    async fn save_message_with_metadata_stores_visibility() {
1086        let store = test_store().await;
1087        let cid = store.create_conversation().await.unwrap();
1088
1089        let id = store
1090            .save_message_with_metadata(cid, "user", "hello", "[]", false, true)
1091            .await
1092            .unwrap();
1093
1094        let history = store.load_history(cid, 10).await.unwrap();
1095        assert_eq!(history.len(), 1);
1096        assert!(!history[0].metadata.agent_visible);
1097        assert!(history[0].metadata.user_visible);
1098        assert_eq!(id, MessageId(1));
1099    }
1100
1101    #[tokio::test]
1102    async fn load_history_filtered_by_agent_visible() {
1103        let store = test_store().await;
1104        let cid = store.create_conversation().await.unwrap();
1105
1106        store
1107            .save_message_with_metadata(cid, "user", "visible to agent", "[]", true, true)
1108            .await
1109            .unwrap();
1110        store
1111            .save_message_with_metadata(cid, "user", "user only", "[]", false, true)
1112            .await
1113            .unwrap();
1114
1115        let agent_msgs = store
1116            .load_history_filtered(cid, 50, Some(true), None)
1117            .await
1118            .unwrap();
1119        assert_eq!(agent_msgs.len(), 1);
1120        assert_eq!(agent_msgs[0].content, "visible to agent");
1121    }
1122
1123    #[tokio::test]
1124    async fn load_history_filtered_by_user_visible() {
1125        let store = test_store().await;
1126        let cid = store.create_conversation().await.unwrap();
1127
1128        store
1129            .save_message_with_metadata(cid, "system", "agent only summary", "[]", true, false)
1130            .await
1131            .unwrap();
1132        store
1133            .save_message_with_metadata(cid, "user", "user sees this", "[]", true, true)
1134            .await
1135            .unwrap();
1136
1137        let user_msgs = store
1138            .load_history_filtered(cid, 50, None, Some(true))
1139            .await
1140            .unwrap();
1141        assert_eq!(user_msgs.len(), 1);
1142        assert_eq!(user_msgs[0].content, "user sees this");
1143    }
1144
1145    #[tokio::test]
1146    async fn load_history_filtered_no_filter_returns_all() {
1147        let store = test_store().await;
1148        let cid = store.create_conversation().await.unwrap();
1149
1150        store
1151            .save_message_with_metadata(cid, "user", "msg1", "[]", true, false)
1152            .await
1153            .unwrap();
1154        store
1155            .save_message_with_metadata(cid, "user", "msg2", "[]", false, true)
1156            .await
1157            .unwrap();
1158
1159        let all_msgs = store
1160            .load_history_filtered(cid, 50, None, None)
1161            .await
1162            .unwrap();
1163        assert_eq!(all_msgs.len(), 2);
1164    }
1165
1166    #[tokio::test]
1167    async fn replace_conversation_marks_originals_and_inserts_summary() {
1168        let store = test_store().await;
1169        let cid = store.create_conversation().await.unwrap();
1170
1171        let id1 = store.save_message(cid, "user", "first").await.unwrap();
1172        let id2 = store
1173            .save_message(cid, "assistant", "second")
1174            .await
1175            .unwrap();
1176        let id3 = store.save_message(cid, "user", "third").await.unwrap();
1177
1178        let summary_id = store
1179            .replace_conversation(cid, id1..=id2, "system", "summary text")
1180            .await
1181            .unwrap();
1182
1183        // Original messages should be user_only
1184        let all = store.load_history(cid, 50).await.unwrap();
1185        // id1 and id2 marked agent_visible=false, id3 untouched, summary inserted
1186        let by_id1 = all.iter().find(|m| m.content == "first").unwrap();
1187        assert!(!by_id1.metadata.agent_visible);
1188        assert!(by_id1.metadata.user_visible);
1189
1190        let by_id2 = all.iter().find(|m| m.content == "second").unwrap();
1191        assert!(!by_id2.metadata.agent_visible);
1192
1193        let by_id3 = all.iter().find(|m| m.content == "third").unwrap();
1194        assert!(by_id3.metadata.agent_visible);
1195
1196        // Summary is agent_only (agent_visible=1, user_visible=0)
1197        let summary = all.iter().find(|m| m.content == "summary text").unwrap();
1198        assert!(summary.metadata.agent_visible);
1199        assert!(!summary.metadata.user_visible);
1200        assert!(summary_id > id3);
1201    }
1202
1203    #[tokio::test]
1204    async fn oldest_message_ids_returns_in_order() {
1205        let store = test_store().await;
1206        let cid = store.create_conversation().await.unwrap();
1207
1208        let id1 = store.save_message(cid, "user", "a").await.unwrap();
1209        let id2 = store.save_message(cid, "assistant", "b").await.unwrap();
1210        let id3 = store.save_message(cid, "user", "c").await.unwrap();
1211
1212        let ids = store.oldest_message_ids(cid, 2).await.unwrap();
1213        assert_eq!(ids, vec![id1, id2]);
1214        assert!(ids[0] < ids[1]);
1215
1216        let all_ids = store.oldest_message_ids(cid, 10).await.unwrap();
1217        assert_eq!(all_ids, vec![id1, id2, id3]);
1218    }
1219
1220    #[tokio::test]
1221    async fn message_metadata_default_both_visible() {
1222        let store = test_store().await;
1223        let cid = store.create_conversation().await.unwrap();
1224
1225        store.save_message(cid, "user", "normal").await.unwrap();
1226
1227        let history = store.load_history(cid, 10).await.unwrap();
1228        assert!(history[0].metadata.agent_visible);
1229        assert!(history[0].metadata.user_visible);
1230        assert!(history[0].metadata.compacted_at.is_none());
1231    }
1232
1233    #[tokio::test]
1234    async fn load_history_empty_parts_json_fast_path() {
1235        let store = test_store().await;
1236        let cid = store.create_conversation().await.unwrap();
1237
1238        store
1239            .save_message_with_parts(cid, "user", "hello", "[]")
1240            .await
1241            .unwrap();
1242
1243        let history = store.load_history(cid, 10).await.unwrap();
1244        assert_eq!(history.len(), 1);
1245        assert!(
1246            history[0].parts.is_empty(),
1247            "\"[]\" fast-path must yield empty parts Vec"
1248        );
1249    }
1250
1251    #[tokio::test]
1252    async fn load_history_non_empty_parts_json_parsed() {
1253        let store = test_store().await;
1254        let cid = store.create_conversation().await.unwrap();
1255
1256        let parts_json = serde_json::to_string(&vec![MessagePart::ToolResult {
1257            tool_use_id: "t1".into(),
1258            content: "result".into(),
1259            is_error: false,
1260        }])
1261        .unwrap();
1262
1263        store
1264            .save_message_with_parts(cid, "user", "hello", &parts_json)
1265            .await
1266            .unwrap();
1267
1268        let history = store.load_history(cid, 10).await.unwrap();
1269        assert_eq!(history.len(), 1);
1270        assert_eq!(history[0].parts.len(), 1);
1271        assert!(
1272            matches!(&history[0].parts[0], MessagePart::ToolResult { content, .. } if content == "result")
1273        );
1274    }
1275
1276    #[tokio::test]
1277    async fn message_by_id_empty_parts_json_fast_path() {
1278        let store = test_store().await;
1279        let cid = store.create_conversation().await.unwrap();
1280
1281        let id = store
1282            .save_message_with_parts(cid, "user", "msg", "[]")
1283            .await
1284            .unwrap();
1285
1286        let msg = store.message_by_id(id).await.unwrap().unwrap();
1287        assert!(
1288            msg.parts.is_empty(),
1289            "\"[]\" fast-path must yield empty parts Vec in message_by_id"
1290        );
1291    }
1292
1293    #[tokio::test]
1294    async fn messages_by_ids_empty_parts_json_fast_path() {
1295        let store = test_store().await;
1296        let cid = store.create_conversation().await.unwrap();
1297
1298        let id = store
1299            .save_message_with_parts(cid, "user", "msg", "[]")
1300            .await
1301            .unwrap();
1302
1303        let results = store.messages_by_ids(&[id]).await.unwrap();
1304        assert_eq!(results.len(), 1);
1305        assert!(
1306            results[0].1.parts.is_empty(),
1307            "\"[]\" fast-path must yield empty parts Vec in messages_by_ids"
1308        );
1309    }
1310
1311    #[tokio::test]
1312    async fn load_history_filtered_empty_parts_json_fast_path() {
1313        let store = test_store().await;
1314        let cid = store.create_conversation().await.unwrap();
1315
1316        store
1317            .save_message_with_metadata(cid, "user", "msg", "[]", true, true)
1318            .await
1319            .unwrap();
1320
1321        let msgs = store
1322            .load_history_filtered(cid, 10, Some(true), None)
1323            .await
1324            .unwrap();
1325        assert_eq!(msgs.len(), 1);
1326        assert!(
1327            msgs[0].parts.is_empty(),
1328            "\"[]\" fast-path must yield empty parts Vec in load_history_filtered"
1329        );
1330    }
1331}