1use zeph_llm::provider::{Message, MessageMetadata, MessagePart, Role};
5
6use super::SqliteStore;
7use crate::error::MemoryError;
8use crate::types::{ConversationId, MessageId};
9
10fn sanitize_fts5_query(query: &str) -> String {
16 query
17 .split(|c: char| !c.is_alphanumeric())
18 .filter(|t| !t.is_empty())
19 .collect::<Vec<_>>()
20 .join(" ")
21}
22
23fn parse_role(s: &str) -> Role {
24 match s {
25 "assistant" => Role::Assistant,
26 "system" => Role::System,
27 _ => Role::User,
28 }
29}
30
31#[must_use]
32pub fn role_str(role: Role) -> &'static str {
33 match role {
34 Role::System => "system",
35 Role::User => "user",
36 Role::Assistant => "assistant",
37 }
38}
39
40impl SqliteStore {
41 pub async fn create_conversation(&self) -> Result<ConversationId, MemoryError> {
47 let row: (ConversationId,) =
48 sqlx::query_as("INSERT INTO conversations DEFAULT VALUES RETURNING id")
49 .fetch_one(&self.pool)
50 .await?;
51 Ok(row.0)
52 }
53
54 pub async fn save_message(
60 &self,
61 conversation_id: ConversationId,
62 role: &str,
63 content: &str,
64 ) -> Result<MessageId, MemoryError> {
65 self.save_message_with_parts(conversation_id, role, content, "[]")
66 .await
67 }
68
69 pub async fn save_message_with_parts(
75 &self,
76 conversation_id: ConversationId,
77 role: &str,
78 content: &str,
79 parts_json: &str,
80 ) -> Result<MessageId, MemoryError> {
81 self.save_message_with_metadata(conversation_id, role, content, parts_json, true, true)
82 .await
83 }
84
85 pub async fn save_message_with_metadata(
91 &self,
92 conversation_id: ConversationId,
93 role: &str,
94 content: &str,
95 parts_json: &str,
96 agent_visible: bool,
97 user_visible: bool,
98 ) -> Result<MessageId, MemoryError> {
99 let row: (MessageId,) = sqlx::query_as(
100 "INSERT INTO messages (conversation_id, role, content, parts, agent_visible, user_visible) \
101 VALUES (?, ?, ?, ?, ?, ?) RETURNING id",
102 )
103 .bind(conversation_id)
104 .bind(role)
105 .bind(content)
106 .bind(parts_json)
107 .bind(i64::from(agent_visible))
108 .bind(i64::from(user_visible))
109 .fetch_one(&self.pool)
110 .await?;
111 Ok(row.0)
112 }
113
114 pub async fn load_history(
120 &self,
121 conversation_id: ConversationId,
122 limit: u32,
123 ) -> Result<Vec<Message>, MemoryError> {
124 let rows: Vec<(String, String, String, i64, i64)> = sqlx::query_as(
125 "SELECT role, content, parts, agent_visible, user_visible FROM (\
126 SELECT role, content, parts, agent_visible, user_visible, id FROM messages \
127 WHERE conversation_id = ? AND deleted_at IS NULL \
128 ORDER BY id DESC \
129 LIMIT ?\
130 ) ORDER BY id ASC",
131 )
132 .bind(conversation_id)
133 .bind(limit)
134 .fetch_all(&self.pool)
135 .await?;
136
137 let messages = rows
138 .into_iter()
139 .map(
140 |(role_str, content, parts_json, agent_visible, user_visible)| {
141 let parts: Vec<MessagePart> = if parts_json == "[]" {
142 vec![]
143 } else {
144 serde_json::from_str(&parts_json).unwrap_or_default()
145 };
146 Message {
147 role: parse_role(&role_str),
148 content,
149 parts,
150 metadata: MessageMetadata {
151 agent_visible: agent_visible != 0,
152 user_visible: user_visible != 0,
153 compacted_at: None,
154 },
155 }
156 },
157 )
158 .collect();
159 Ok(messages)
160 }
161
162 pub async fn load_history_filtered(
170 &self,
171 conversation_id: ConversationId,
172 limit: u32,
173 agent_visible: Option<bool>,
174 user_visible: Option<bool>,
175 ) -> Result<Vec<Message>, MemoryError> {
176 let av = agent_visible.map(i64::from);
177 let uv = user_visible.map(i64::from);
178
179 let rows: Vec<(String, String, String, i64, i64)> = sqlx::query_as(
180 "WITH recent AS (\
181 SELECT role, content, parts, agent_visible, user_visible, id FROM messages \
182 WHERE conversation_id = ? \
183 AND deleted_at IS NULL \
184 AND (? IS NULL OR agent_visible = ?) \
185 AND (? IS NULL OR user_visible = ?) \
186 ORDER BY id DESC \
187 LIMIT ?\
188 ) SELECT role, content, parts, agent_visible, user_visible FROM recent ORDER BY id ASC",
189 )
190 .bind(conversation_id)
191 .bind(av)
192 .bind(av)
193 .bind(uv)
194 .bind(uv)
195 .bind(limit)
196 .fetch_all(&self.pool)
197 .await?;
198
199 let messages = rows
200 .into_iter()
201 .map(
202 |(role_str, content, parts_json, agent_visible, user_visible)| {
203 let parts: Vec<MessagePart> = if parts_json == "[]" {
204 vec![]
205 } else {
206 serde_json::from_str(&parts_json).unwrap_or_default()
207 };
208 Message {
209 role: parse_role(&role_str),
210 content,
211 parts,
212 metadata: MessageMetadata {
213 agent_visible: agent_visible != 0,
214 user_visible: user_visible != 0,
215 compacted_at: None,
216 },
217 }
218 },
219 )
220 .collect();
221 Ok(messages)
222 }
223
224 pub async fn replace_conversation(
236 &self,
237 conversation_id: ConversationId,
238 compacted_range: std::ops::RangeInclusive<MessageId>,
239 summary_role: &str,
240 summary_content: &str,
241 ) -> Result<MessageId, MemoryError> {
242 let now = {
243 let secs = std::time::SystemTime::now()
244 .duration_since(std::time::UNIX_EPOCH)
245 .unwrap_or_default()
246 .as_secs();
247 format!("{secs}")
248 };
249 let start_id = compacted_range.start().0;
250 let end_id = compacted_range.end().0;
251
252 let mut tx = self.pool.begin().await?;
253
254 sqlx::query(
255 "UPDATE messages SET agent_visible = 0, compacted_at = ? \
256 WHERE conversation_id = ? AND id >= ? AND id <= ?",
257 )
258 .bind(&now)
259 .bind(conversation_id)
260 .bind(start_id)
261 .bind(end_id)
262 .execute(&mut *tx)
263 .await?;
264
265 let row: (MessageId,) = sqlx::query_as(
266 "INSERT INTO messages \
267 (conversation_id, role, content, parts, agent_visible, user_visible) \
268 VALUES (?, ?, ?, '[]', 1, 0) RETURNING id",
269 )
270 .bind(conversation_id)
271 .bind(summary_role)
272 .bind(summary_content)
273 .fetch_one(&mut *tx)
274 .await?;
275
276 tx.commit().await?;
277
278 Ok(row.0)
279 }
280
281 pub async fn oldest_message_ids(
287 &self,
288 conversation_id: ConversationId,
289 n: u32,
290 ) -> Result<Vec<MessageId>, MemoryError> {
291 let rows: Vec<(MessageId,)> = sqlx::query_as(
292 "SELECT id FROM messages WHERE conversation_id = ? AND deleted_at IS NULL ORDER BY id ASC LIMIT ?",
293 )
294 .bind(conversation_id)
295 .bind(n)
296 .fetch_all(&self.pool)
297 .await?;
298 Ok(rows.into_iter().map(|r| r.0).collect())
299 }
300
301 pub async fn latest_conversation_id(&self) -> Result<Option<ConversationId>, MemoryError> {
307 let row: Option<(ConversationId,)> =
308 sqlx::query_as("SELECT id FROM conversations ORDER BY id DESC LIMIT 1")
309 .fetch_optional(&self.pool)
310 .await?;
311 Ok(row.map(|r| r.0))
312 }
313
314 pub async fn message_by_id(
320 &self,
321 message_id: MessageId,
322 ) -> Result<Option<Message>, MemoryError> {
323 let row: Option<(String, String, String, i64, i64)> = sqlx::query_as(
324 "SELECT role, content, parts, agent_visible, user_visible FROM messages WHERE id = ? AND deleted_at IS NULL",
325 )
326 .bind(message_id)
327 .fetch_optional(&self.pool)
328 .await?;
329
330 Ok(row.map(
331 |(role_str, content, parts_json, agent_visible, user_visible)| {
332 let parts: Vec<MessagePart> = if parts_json == "[]" {
333 vec![]
334 } else {
335 serde_json::from_str(&parts_json).unwrap_or_default()
336 };
337 Message {
338 role: parse_role(&role_str),
339 content,
340 parts,
341 metadata: MessageMetadata {
342 agent_visible: agent_visible != 0,
343 user_visible: user_visible != 0,
344 compacted_at: None,
345 },
346 }
347 },
348 ))
349 }
350
351 pub async fn messages_by_ids(
357 &self,
358 ids: &[MessageId],
359 ) -> Result<Vec<(MessageId, Message)>, MemoryError> {
360 if ids.is_empty() {
361 return Ok(Vec::new());
362 }
363
364 let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
365
366 let query = format!(
367 "SELECT id, role, content, parts FROM messages \
368 WHERE id IN ({placeholders}) AND agent_visible = 1 AND deleted_at IS NULL"
369 );
370 let mut q = sqlx::query_as::<_, (MessageId, String, String, String)>(&query);
371 for &id in ids {
372 q = q.bind(id);
373 }
374
375 let rows = q.fetch_all(&self.pool).await?;
376
377 Ok(rows
378 .into_iter()
379 .map(|(id, role_str, content, parts_json)| {
380 let parts: Vec<MessagePart> = if parts_json == "[]" {
381 vec![]
382 } else {
383 serde_json::from_str(&parts_json).unwrap_or_default()
384 };
385 (
386 id,
387 Message {
388 role: parse_role(&role_str),
389 content,
390 parts,
391 metadata: MessageMetadata::default(),
392 },
393 )
394 })
395 .collect())
396 }
397
398 pub async fn unembedded_message_ids(
404 &self,
405 limit: Option<usize>,
406 ) -> Result<Vec<(MessageId, ConversationId, String, String)>, MemoryError> {
407 let effective_limit = limit.map_or(i64::MAX, |l| i64::try_from(l).unwrap_or(i64::MAX));
408
409 let rows: Vec<(MessageId, ConversationId, String, String)> = sqlx::query_as(
410 "SELECT m.id, m.conversation_id, m.role, m.content \
411 FROM messages m \
412 LEFT JOIN embeddings_metadata em ON m.id = em.message_id \
413 WHERE em.id IS NULL AND m.deleted_at IS NULL \
414 ORDER BY m.id ASC \
415 LIMIT ?",
416 )
417 .bind(effective_limit)
418 .fetch_all(&self.pool)
419 .await?;
420
421 Ok(rows)
422 }
423
424 pub async fn count_messages(
430 &self,
431 conversation_id: ConversationId,
432 ) -> Result<i64, MemoryError> {
433 let row: (i64,) = sqlx::query_as(
434 "SELECT COUNT(*) FROM messages WHERE conversation_id = ? AND deleted_at IS NULL",
435 )
436 .bind(conversation_id)
437 .fetch_one(&self.pool)
438 .await?;
439 Ok(row.0)
440 }
441
442 pub async fn count_messages_after(
448 &self,
449 conversation_id: ConversationId,
450 after_id: MessageId,
451 ) -> Result<i64, MemoryError> {
452 let row: (i64,) =
453 sqlx::query_as(
454 "SELECT COUNT(*) FROM messages WHERE conversation_id = ? AND id > ? AND deleted_at IS NULL",
455 )
456 .bind(conversation_id)
457 .bind(after_id)
458 .fetch_one(&self.pool)
459 .await?;
460 Ok(row.0)
461 }
462
463 pub async fn keyword_search(
472 &self,
473 query: &str,
474 limit: usize,
475 conversation_id: Option<ConversationId>,
476 ) -> Result<Vec<(MessageId, f64)>, MemoryError> {
477 let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
478 let safe_query = sanitize_fts5_query(query);
479 if safe_query.is_empty() {
480 return Ok(Vec::new());
481 }
482
483 let rows: Vec<(MessageId, f64)> = if let Some(cid) = conversation_id {
484 sqlx::query_as(
485 "SELECT m.id, -rank AS score \
486 FROM messages_fts f \
487 JOIN messages m ON m.id = f.rowid \
488 WHERE messages_fts MATCH ? AND m.conversation_id = ? AND m.agent_visible = 1 AND m.deleted_at IS NULL \
489 ORDER BY rank \
490 LIMIT ?",
491 )
492 .bind(&safe_query)
493 .bind(cid)
494 .bind(effective_limit)
495 .fetch_all(&self.pool)
496 .await?
497 } else {
498 sqlx::query_as(
499 "SELECT m.id, -rank AS score \
500 FROM messages_fts f \
501 JOIN messages m ON m.id = f.rowid \
502 WHERE messages_fts MATCH ? AND m.agent_visible = 1 AND m.deleted_at IS NULL \
503 ORDER BY rank \
504 LIMIT ?",
505 )
506 .bind(&safe_query)
507 .bind(effective_limit)
508 .fetch_all(&self.pool)
509 .await?
510 };
511
512 Ok(rows)
513 }
514
515 pub async fn message_timestamps(
523 &self,
524 ids: &[MessageId],
525 ) -> Result<std::collections::HashMap<MessageId, i64>, MemoryError> {
526 if ids.is_empty() {
527 return Ok(std::collections::HashMap::new());
528 }
529
530 let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
531 let query = format!(
532 "SELECT id, COALESCE(CAST(strftime('%s', created_at) AS INTEGER), 0) \
533 FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
534 );
535 let mut q = sqlx::query_as::<_, (MessageId, i64)>(&query);
536 for &id in ids {
537 q = q.bind(id);
538 }
539
540 let rows = q.fetch_all(&self.pool).await?;
541 Ok(rows.into_iter().collect())
542 }
543
544 pub async fn load_messages_range(
550 &self,
551 conversation_id: ConversationId,
552 after_message_id: MessageId,
553 limit: usize,
554 ) -> Result<Vec<(MessageId, String, String)>, MemoryError> {
555 let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
556
557 let rows: Vec<(MessageId, String, String)> = sqlx::query_as(
558 "SELECT id, role, content FROM messages \
559 WHERE conversation_id = ? AND id > ? AND deleted_at IS NULL \
560 ORDER BY id ASC LIMIT ?",
561 )
562 .bind(conversation_id)
563 .bind(after_message_id)
564 .bind(effective_limit)
565 .fetch_all(&self.pool)
566 .await?;
567
568 Ok(rows)
569 }
570
571 pub async fn get_eviction_candidates(
579 &self,
580 ) -> Result<Vec<crate::eviction::EvictionEntry>, crate::error::MemoryError> {
581 let rows: Vec<(MessageId, String, Option<String>, i64)> = sqlx::query_as(
582 "SELECT id, created_at, last_accessed, access_count \
583 FROM messages WHERE deleted_at IS NULL",
584 )
585 .fetch_all(&self.pool)
586 .await?;
587
588 Ok(rows
589 .into_iter()
590 .map(
591 |(id, created_at, last_accessed, access_count)| crate::eviction::EvictionEntry {
592 id,
593 created_at,
594 last_accessed,
595 access_count: access_count.try_into().unwrap_or(0),
596 },
597 )
598 .collect())
599 }
600
601 pub async fn soft_delete_messages(
609 &self,
610 ids: &[MessageId],
611 ) -> Result<(), crate::error::MemoryError> {
612 if ids.is_empty() {
613 return Ok(());
614 }
615 for &id in ids {
617 sqlx::query(
618 "UPDATE messages SET deleted_at = datetime('now') WHERE id = ? AND deleted_at IS NULL",
619 )
620 .bind(id)
621 .execute(&self.pool)
622 .await?;
623 }
624 Ok(())
625 }
626
627 pub async fn get_soft_deleted_message_ids(
633 &self,
634 ) -> Result<Vec<MessageId>, crate::error::MemoryError> {
635 let rows: Vec<(MessageId,)> = sqlx::query_as(
636 "SELECT id FROM messages WHERE deleted_at IS NOT NULL AND qdrant_cleaned = 0",
637 )
638 .fetch_all(&self.pool)
639 .await?;
640 Ok(rows.into_iter().map(|(id,)| id).collect())
641 }
642
643 pub async fn mark_qdrant_cleaned(
649 &self,
650 ids: &[MessageId],
651 ) -> Result<(), crate::error::MemoryError> {
652 for &id in ids {
653 sqlx::query("UPDATE messages SET qdrant_cleaned = 1 WHERE id = ?")
654 .bind(id)
655 .execute(&self.pool)
656 .await?;
657 }
658 Ok(())
659 }
660}
661
662#[cfg(test)]
663mod tests {
664 use super::*;
665
666 async fn test_store() -> SqliteStore {
667 SqliteStore::new(":memory:").await.unwrap()
668 }
669
670 #[tokio::test]
671 async fn create_conversation_returns_id() {
672 let store = test_store().await;
673 let id1 = store.create_conversation().await.unwrap();
674 let id2 = store.create_conversation().await.unwrap();
675 assert_eq!(id1, ConversationId(1));
676 assert_eq!(id2, ConversationId(2));
677 }
678
679 #[tokio::test]
680 async fn save_and_load_messages() {
681 let store = test_store().await;
682 let cid = store.create_conversation().await.unwrap();
683
684 let msg_id1 = store.save_message(cid, "user", "hello").await.unwrap();
685 let msg_id2 = store
686 .save_message(cid, "assistant", "hi there")
687 .await
688 .unwrap();
689
690 assert_eq!(msg_id1, MessageId(1));
691 assert_eq!(msg_id2, MessageId(2));
692
693 let history = store.load_history(cid, 50).await.unwrap();
694 assert_eq!(history.len(), 2);
695 assert_eq!(history[0].role, Role::User);
696 assert_eq!(history[0].content, "hello");
697 assert_eq!(history[1].role, Role::Assistant);
698 assert_eq!(history[1].content, "hi there");
699 }
700
701 #[tokio::test]
702 async fn load_history_respects_limit() {
703 let store = test_store().await;
704 let cid = store.create_conversation().await.unwrap();
705
706 for i in 0..10 {
707 store
708 .save_message(cid, "user", &format!("msg {i}"))
709 .await
710 .unwrap();
711 }
712
713 let history = store.load_history(cid, 3).await.unwrap();
714 assert_eq!(history.len(), 3);
715 assert_eq!(history[0].content, "msg 7");
716 assert_eq!(history[1].content, "msg 8");
717 assert_eq!(history[2].content, "msg 9");
718 }
719
720 #[tokio::test]
721 async fn latest_conversation_id_empty() {
722 let store = test_store().await;
723 assert!(store.latest_conversation_id().await.unwrap().is_none());
724 }
725
726 #[tokio::test]
727 async fn latest_conversation_id_returns_newest() {
728 let store = test_store().await;
729 store.create_conversation().await.unwrap();
730 let id2 = store.create_conversation().await.unwrap();
731 assert_eq!(store.latest_conversation_id().await.unwrap(), Some(id2));
732 }
733
734 #[tokio::test]
735 async fn messages_isolated_per_conversation() {
736 let store = test_store().await;
737 let cid1 = store.create_conversation().await.unwrap();
738 let cid2 = store.create_conversation().await.unwrap();
739
740 store.save_message(cid1, "user", "conv1").await.unwrap();
741 store.save_message(cid2, "user", "conv2").await.unwrap();
742
743 let h1 = store.load_history(cid1, 50).await.unwrap();
744 let h2 = store.load_history(cid2, 50).await.unwrap();
745 assert_eq!(h1.len(), 1);
746 assert_eq!(h1[0].content, "conv1");
747 assert_eq!(h2.len(), 1);
748 assert_eq!(h2[0].content, "conv2");
749 }
750
751 #[tokio::test]
752 async fn pool_accessor_returns_valid_pool() {
753 let store = test_store().await;
754 let pool = store.pool();
755 let row: (i64,) = sqlx::query_as("SELECT 1").fetch_one(pool).await.unwrap();
756 assert_eq!(row.0, 1);
757 }
758
759 #[tokio::test]
760 async fn embeddings_metadata_table_exists() {
761 let store = test_store().await;
762 let result: (i64,) = sqlx::query_as(
763 "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='embeddings_metadata'",
764 )
765 .fetch_one(store.pool())
766 .await
767 .unwrap();
768 assert_eq!(result.0, 1);
769 }
770
771 #[tokio::test]
772 async fn cascade_delete_removes_embeddings_metadata() {
773 let store = test_store().await;
774 let pool = store.pool();
775
776 let cid = store.create_conversation().await.unwrap();
777 let msg_id = store.save_message(cid, "user", "test").await.unwrap();
778
779 let point_id = uuid::Uuid::new_v4().to_string();
780 sqlx::query(
781 "INSERT INTO embeddings_metadata (message_id, qdrant_point_id, dimensions) \
782 VALUES (?, ?, ?)",
783 )
784 .bind(msg_id)
785 .bind(&point_id)
786 .bind(768_i64)
787 .execute(pool)
788 .await
789 .unwrap();
790
791 let before: (i64,) =
792 sqlx::query_as("SELECT COUNT(*) FROM embeddings_metadata WHERE message_id = ?")
793 .bind(msg_id)
794 .fetch_one(pool)
795 .await
796 .unwrap();
797 assert_eq!(before.0, 1);
798
799 sqlx::query("DELETE FROM messages WHERE id = ?")
800 .bind(msg_id)
801 .execute(pool)
802 .await
803 .unwrap();
804
805 let after: (i64,) =
806 sqlx::query_as("SELECT COUNT(*) FROM embeddings_metadata WHERE message_id = ?")
807 .bind(msg_id)
808 .fetch_one(pool)
809 .await
810 .unwrap();
811 assert_eq!(after.0, 0);
812 }
813
814 #[tokio::test]
815 async fn messages_by_ids_batch_fetch() {
816 let store = test_store().await;
817 let cid = store.create_conversation().await.unwrap();
818 let id1 = store.save_message(cid, "user", "hello").await.unwrap();
819 let id2 = store.save_message(cid, "assistant", "hi").await.unwrap();
820 let _id3 = store.save_message(cid, "user", "bye").await.unwrap();
821
822 let results = store.messages_by_ids(&[id1, id2]).await.unwrap();
823 assert_eq!(results.len(), 2);
824 assert_eq!(results[0].0, id1);
825 assert_eq!(results[0].1.content, "hello");
826 assert_eq!(results[1].0, id2);
827 assert_eq!(results[1].1.content, "hi");
828 }
829
830 #[tokio::test]
831 async fn messages_by_ids_empty_input() {
832 let store = test_store().await;
833 let results = store.messages_by_ids(&[]).await.unwrap();
834 assert!(results.is_empty());
835 }
836
837 #[tokio::test]
838 async fn messages_by_ids_nonexistent() {
839 let store = test_store().await;
840 let results = store
841 .messages_by_ids(&[MessageId(999), MessageId(1000)])
842 .await
843 .unwrap();
844 assert!(results.is_empty());
845 }
846
847 #[tokio::test]
848 async fn message_by_id_fetches_existing() {
849 let store = test_store().await;
850 let cid = store.create_conversation().await.unwrap();
851 let msg_id = store.save_message(cid, "user", "hello").await.unwrap();
852
853 let msg = store.message_by_id(msg_id).await.unwrap();
854 assert!(msg.is_some());
855 let msg = msg.unwrap();
856 assert_eq!(msg.role, Role::User);
857 assert_eq!(msg.content, "hello");
858 }
859
860 #[tokio::test]
861 async fn message_by_id_returns_none_for_nonexistent() {
862 let store = test_store().await;
863 let msg = store.message_by_id(MessageId(999)).await.unwrap();
864 assert!(msg.is_none());
865 }
866
867 #[tokio::test]
868 async fn unembedded_message_ids_returns_all_when_none_embedded() {
869 let store = test_store().await;
870 let cid = store.create_conversation().await.unwrap();
871
872 store.save_message(cid, "user", "msg1").await.unwrap();
873 store.save_message(cid, "assistant", "msg2").await.unwrap();
874
875 let unembedded = store.unembedded_message_ids(None).await.unwrap();
876 assert_eq!(unembedded.len(), 2);
877 assert_eq!(unembedded[0].3, "msg1");
878 assert_eq!(unembedded[1].3, "msg2");
879 }
880
881 #[tokio::test]
882 async fn unembedded_message_ids_excludes_embedded() {
883 let store = test_store().await;
884 let pool = store.pool();
885 let cid = store.create_conversation().await.unwrap();
886
887 let msg_id1 = store.save_message(cid, "user", "msg1").await.unwrap();
888 let msg_id2 = store.save_message(cid, "assistant", "msg2").await.unwrap();
889
890 let point_id = uuid::Uuid::new_v4().to_string();
891 sqlx::query(
892 "INSERT INTO embeddings_metadata (message_id, qdrant_point_id, dimensions) \
893 VALUES (?, ?, ?)",
894 )
895 .bind(msg_id1)
896 .bind(&point_id)
897 .bind(768_i64)
898 .execute(pool)
899 .await
900 .unwrap();
901
902 let unembedded = store.unembedded_message_ids(None).await.unwrap();
903 assert_eq!(unembedded.len(), 1);
904 assert_eq!(unembedded[0].0, msg_id2);
905 assert_eq!(unembedded[0].3, "msg2");
906 }
907
908 #[tokio::test]
909 async fn unembedded_message_ids_respects_limit() {
910 let store = test_store().await;
911 let cid = store.create_conversation().await.unwrap();
912
913 for i in 0..10 {
914 store
915 .save_message(cid, "user", &format!("msg{i}"))
916 .await
917 .unwrap();
918 }
919
920 let unembedded = store.unembedded_message_ids(Some(3)).await.unwrap();
921 assert_eq!(unembedded.len(), 3);
922 }
923
924 #[tokio::test]
925 async fn count_messages_returns_correct_count() {
926 let store = test_store().await;
927 let cid = store.create_conversation().await.unwrap();
928
929 assert_eq!(store.count_messages(cid).await.unwrap(), 0);
930
931 store.save_message(cid, "user", "msg1").await.unwrap();
932 store.save_message(cid, "assistant", "msg2").await.unwrap();
933
934 assert_eq!(store.count_messages(cid).await.unwrap(), 2);
935 }
936
937 #[tokio::test]
938 async fn count_messages_after_filters_correctly() {
939 let store = test_store().await;
940 let cid = store.create_conversation().await.unwrap();
941
942 let id1 = store.save_message(cid, "user", "msg1").await.unwrap();
943 let _id2 = store.save_message(cid, "assistant", "msg2").await.unwrap();
944 let id3 = store.save_message(cid, "user", "msg3").await.unwrap();
945
946 assert_eq!(
947 store.count_messages_after(cid, MessageId(0)).await.unwrap(),
948 3
949 );
950 assert_eq!(store.count_messages_after(cid, id1).await.unwrap(), 2);
951 assert_eq!(store.count_messages_after(cid, id3).await.unwrap(), 0);
952 }
953
954 #[tokio::test]
955 async fn load_messages_range_basic() {
956 let store = test_store().await;
957 let cid = store.create_conversation().await.unwrap();
958
959 let msg_id1 = store.save_message(cid, "user", "msg1").await.unwrap();
960 let msg_id2 = store.save_message(cid, "assistant", "msg2").await.unwrap();
961 let msg_id3 = store.save_message(cid, "user", "msg3").await.unwrap();
962
963 let msgs = store.load_messages_range(cid, msg_id1, 10).await.unwrap();
964 assert_eq!(msgs.len(), 2);
965 assert_eq!(msgs[0].0, msg_id2);
966 assert_eq!(msgs[0].2, "msg2");
967 assert_eq!(msgs[1].0, msg_id3);
968 assert_eq!(msgs[1].2, "msg3");
969 }
970
971 #[tokio::test]
972 async fn load_messages_range_respects_limit() {
973 let store = test_store().await;
974 let cid = store.create_conversation().await.unwrap();
975
976 store.save_message(cid, "user", "msg1").await.unwrap();
977 store.save_message(cid, "assistant", "msg2").await.unwrap();
978 store.save_message(cid, "user", "msg3").await.unwrap();
979
980 let msgs = store
981 .load_messages_range(cid, MessageId(0), 2)
982 .await
983 .unwrap();
984 assert_eq!(msgs.len(), 2);
985 }
986
987 #[tokio::test]
988 async fn keyword_search_basic() {
989 let store = test_store().await;
990 let cid = store.create_conversation().await.unwrap();
991
992 store
993 .save_message(cid, "user", "rust programming language")
994 .await
995 .unwrap();
996 store
997 .save_message(cid, "assistant", "python is great too")
998 .await
999 .unwrap();
1000 store
1001 .save_message(cid, "user", "I love rust and cargo")
1002 .await
1003 .unwrap();
1004
1005 let results = store.keyword_search("rust", 10, None).await.unwrap();
1006 assert_eq!(results.len(), 2);
1007 assert!(results.iter().all(|(_, score)| *score > 0.0));
1008 }
1009
1010 #[tokio::test]
1011 async fn keyword_search_with_conversation_filter() {
1012 let store = test_store().await;
1013 let cid1 = store.create_conversation().await.unwrap();
1014 let cid2 = store.create_conversation().await.unwrap();
1015
1016 store
1017 .save_message(cid1, "user", "hello world")
1018 .await
1019 .unwrap();
1020 store
1021 .save_message(cid2, "user", "hello universe")
1022 .await
1023 .unwrap();
1024
1025 let results = store.keyword_search("hello", 10, Some(cid1)).await.unwrap();
1026 assert_eq!(results.len(), 1);
1027 }
1028
1029 #[tokio::test]
1030 async fn keyword_search_no_match() {
1031 let store = test_store().await;
1032 let cid = store.create_conversation().await.unwrap();
1033
1034 store
1035 .save_message(cid, "user", "hello world")
1036 .await
1037 .unwrap();
1038
1039 let results = store.keyword_search("nonexistent", 10, None).await.unwrap();
1040 assert!(results.is_empty());
1041 }
1042
1043 #[tokio::test]
1044 async fn keyword_search_respects_limit() {
1045 let store = test_store().await;
1046 let cid = store.create_conversation().await.unwrap();
1047
1048 for i in 0..10 {
1049 store
1050 .save_message(cid, "user", &format!("test message {i}"))
1051 .await
1052 .unwrap();
1053 }
1054
1055 let results = store.keyword_search("test", 3, None).await.unwrap();
1056 assert_eq!(results.len(), 3);
1057 }
1058
1059 #[test]
1060 fn sanitize_fts5_query_strips_special_chars() {
1061 assert_eq!(sanitize_fts5_query("skill-audit"), "skill audit");
1062 assert_eq!(sanitize_fts5_query("hello, world"), "hello world");
1063 assert_eq!(sanitize_fts5_query("a+b*c^d"), "a b c d");
1064 assert_eq!(sanitize_fts5_query(" "), "");
1065 assert_eq!(sanitize_fts5_query("rust programming"), "rust programming");
1066 }
1067
1068 #[tokio::test]
1069 async fn keyword_search_with_special_chars_does_not_error() {
1070 let store = test_store().await;
1071 let cid = store.create_conversation().await.unwrap();
1072 store
1073 .save_message(cid, "user", "skill audit info")
1074 .await
1075 .unwrap();
1076 store
1079 .keyword_search("skill-audit, confidence=0.1", 10, None)
1080 .await
1081 .unwrap();
1082 }
1083
1084 #[tokio::test]
1085 async fn save_message_with_metadata_stores_visibility() {
1086 let store = test_store().await;
1087 let cid = store.create_conversation().await.unwrap();
1088
1089 let id = store
1090 .save_message_with_metadata(cid, "user", "hello", "[]", false, true)
1091 .await
1092 .unwrap();
1093
1094 let history = store.load_history(cid, 10).await.unwrap();
1095 assert_eq!(history.len(), 1);
1096 assert!(!history[0].metadata.agent_visible);
1097 assert!(history[0].metadata.user_visible);
1098 assert_eq!(id, MessageId(1));
1099 }
1100
1101 #[tokio::test]
1102 async fn load_history_filtered_by_agent_visible() {
1103 let store = test_store().await;
1104 let cid = store.create_conversation().await.unwrap();
1105
1106 store
1107 .save_message_with_metadata(cid, "user", "visible to agent", "[]", true, true)
1108 .await
1109 .unwrap();
1110 store
1111 .save_message_with_metadata(cid, "user", "user only", "[]", false, true)
1112 .await
1113 .unwrap();
1114
1115 let agent_msgs = store
1116 .load_history_filtered(cid, 50, Some(true), None)
1117 .await
1118 .unwrap();
1119 assert_eq!(agent_msgs.len(), 1);
1120 assert_eq!(agent_msgs[0].content, "visible to agent");
1121 }
1122
1123 #[tokio::test]
1124 async fn load_history_filtered_by_user_visible() {
1125 let store = test_store().await;
1126 let cid = store.create_conversation().await.unwrap();
1127
1128 store
1129 .save_message_with_metadata(cid, "system", "agent only summary", "[]", true, false)
1130 .await
1131 .unwrap();
1132 store
1133 .save_message_with_metadata(cid, "user", "user sees this", "[]", true, true)
1134 .await
1135 .unwrap();
1136
1137 let user_msgs = store
1138 .load_history_filtered(cid, 50, None, Some(true))
1139 .await
1140 .unwrap();
1141 assert_eq!(user_msgs.len(), 1);
1142 assert_eq!(user_msgs[0].content, "user sees this");
1143 }
1144
1145 #[tokio::test]
1146 async fn load_history_filtered_no_filter_returns_all() {
1147 let store = test_store().await;
1148 let cid = store.create_conversation().await.unwrap();
1149
1150 store
1151 .save_message_with_metadata(cid, "user", "msg1", "[]", true, false)
1152 .await
1153 .unwrap();
1154 store
1155 .save_message_with_metadata(cid, "user", "msg2", "[]", false, true)
1156 .await
1157 .unwrap();
1158
1159 let all_msgs = store
1160 .load_history_filtered(cid, 50, None, None)
1161 .await
1162 .unwrap();
1163 assert_eq!(all_msgs.len(), 2);
1164 }
1165
1166 #[tokio::test]
1167 async fn replace_conversation_marks_originals_and_inserts_summary() {
1168 let store = test_store().await;
1169 let cid = store.create_conversation().await.unwrap();
1170
1171 let id1 = store.save_message(cid, "user", "first").await.unwrap();
1172 let id2 = store
1173 .save_message(cid, "assistant", "second")
1174 .await
1175 .unwrap();
1176 let id3 = store.save_message(cid, "user", "third").await.unwrap();
1177
1178 let summary_id = store
1179 .replace_conversation(cid, id1..=id2, "system", "summary text")
1180 .await
1181 .unwrap();
1182
1183 let all = store.load_history(cid, 50).await.unwrap();
1185 let by_id1 = all.iter().find(|m| m.content == "first").unwrap();
1187 assert!(!by_id1.metadata.agent_visible);
1188 assert!(by_id1.metadata.user_visible);
1189
1190 let by_id2 = all.iter().find(|m| m.content == "second").unwrap();
1191 assert!(!by_id2.metadata.agent_visible);
1192
1193 let by_id3 = all.iter().find(|m| m.content == "third").unwrap();
1194 assert!(by_id3.metadata.agent_visible);
1195
1196 let summary = all.iter().find(|m| m.content == "summary text").unwrap();
1198 assert!(summary.metadata.agent_visible);
1199 assert!(!summary.metadata.user_visible);
1200 assert!(summary_id > id3);
1201 }
1202
1203 #[tokio::test]
1204 async fn oldest_message_ids_returns_in_order() {
1205 let store = test_store().await;
1206 let cid = store.create_conversation().await.unwrap();
1207
1208 let id1 = store.save_message(cid, "user", "a").await.unwrap();
1209 let id2 = store.save_message(cid, "assistant", "b").await.unwrap();
1210 let id3 = store.save_message(cid, "user", "c").await.unwrap();
1211
1212 let ids = store.oldest_message_ids(cid, 2).await.unwrap();
1213 assert_eq!(ids, vec![id1, id2]);
1214 assert!(ids[0] < ids[1]);
1215
1216 let all_ids = store.oldest_message_ids(cid, 10).await.unwrap();
1217 assert_eq!(all_ids, vec![id1, id2, id3]);
1218 }
1219
1220 #[tokio::test]
1221 async fn message_metadata_default_both_visible() {
1222 let store = test_store().await;
1223 let cid = store.create_conversation().await.unwrap();
1224
1225 store.save_message(cid, "user", "normal").await.unwrap();
1226
1227 let history = store.load_history(cid, 10).await.unwrap();
1228 assert!(history[0].metadata.agent_visible);
1229 assert!(history[0].metadata.user_visible);
1230 assert!(history[0].metadata.compacted_at.is_none());
1231 }
1232
1233 #[tokio::test]
1234 async fn load_history_empty_parts_json_fast_path() {
1235 let store = test_store().await;
1236 let cid = store.create_conversation().await.unwrap();
1237
1238 store
1239 .save_message_with_parts(cid, "user", "hello", "[]")
1240 .await
1241 .unwrap();
1242
1243 let history = store.load_history(cid, 10).await.unwrap();
1244 assert_eq!(history.len(), 1);
1245 assert!(
1246 history[0].parts.is_empty(),
1247 "\"[]\" fast-path must yield empty parts Vec"
1248 );
1249 }
1250
1251 #[tokio::test]
1252 async fn load_history_non_empty_parts_json_parsed() {
1253 let store = test_store().await;
1254 let cid = store.create_conversation().await.unwrap();
1255
1256 let parts_json = serde_json::to_string(&vec![MessagePart::ToolResult {
1257 tool_use_id: "t1".into(),
1258 content: "result".into(),
1259 is_error: false,
1260 }])
1261 .unwrap();
1262
1263 store
1264 .save_message_with_parts(cid, "user", "hello", &parts_json)
1265 .await
1266 .unwrap();
1267
1268 let history = store.load_history(cid, 10).await.unwrap();
1269 assert_eq!(history.len(), 1);
1270 assert_eq!(history[0].parts.len(), 1);
1271 assert!(
1272 matches!(&history[0].parts[0], MessagePart::ToolResult { content, .. } if content == "result")
1273 );
1274 }
1275
1276 #[tokio::test]
1277 async fn message_by_id_empty_parts_json_fast_path() {
1278 let store = test_store().await;
1279 let cid = store.create_conversation().await.unwrap();
1280
1281 let id = store
1282 .save_message_with_parts(cid, "user", "msg", "[]")
1283 .await
1284 .unwrap();
1285
1286 let msg = store.message_by_id(id).await.unwrap().unwrap();
1287 assert!(
1288 msg.parts.is_empty(),
1289 "\"[]\" fast-path must yield empty parts Vec in message_by_id"
1290 );
1291 }
1292
1293 #[tokio::test]
1294 async fn messages_by_ids_empty_parts_json_fast_path() {
1295 let store = test_store().await;
1296 let cid = store.create_conversation().await.unwrap();
1297
1298 let id = store
1299 .save_message_with_parts(cid, "user", "msg", "[]")
1300 .await
1301 .unwrap();
1302
1303 let results = store.messages_by_ids(&[id]).await.unwrap();
1304 assert_eq!(results.len(), 1);
1305 assert!(
1306 results[0].1.parts.is_empty(),
1307 "\"[]\" fast-path must yield empty parts Vec in messages_by_ids"
1308 );
1309 }
1310
1311 #[tokio::test]
1312 async fn load_history_filtered_empty_parts_json_fast_path() {
1313 let store = test_store().await;
1314 let cid = store.create_conversation().await.unwrap();
1315
1316 store
1317 .save_message_with_metadata(cid, "user", "msg", "[]", true, true)
1318 .await
1319 .unwrap();
1320
1321 let msgs = store
1322 .load_history_filtered(cid, 10, Some(true), None)
1323 .await
1324 .unwrap();
1325 assert_eq!(msgs.len(), 1);
1326 assert!(
1327 msgs[0].parts.is_empty(),
1328 "\"[]\" fast-path must yield empty parts Vec in load_history_filtered"
1329 );
1330 }
1331}