1use zeph_llm::provider::{Message, MessageMetadata, MessagePart, Role};
5
6use super::SqliteStore;
7use crate::error::MemoryError;
8use crate::types::{ConversationId, MessageId};
9
10pub(crate) fn sanitize_fts5_query(query: &str) -> String {
20 query
21 .split(|c: char| !c.is_alphanumeric())
22 .filter(|t| !t.is_empty())
23 .collect::<Vec<_>>()
24 .join(" ")
25}
26
27fn parse_role(s: &str) -> Role {
28 match s {
29 "assistant" => Role::Assistant,
30 "system" => Role::System,
31 _ => Role::User,
32 }
33}
34
35#[must_use]
36pub fn role_str(role: Role) -> &'static str {
37 match role {
38 Role::System => "system",
39 Role::User => "user",
40 Role::Assistant => "assistant",
41 }
42}
43
44impl SqliteStore {
45 pub async fn create_conversation(&self) -> Result<ConversationId, MemoryError> {
51 let row: (ConversationId,) =
52 sqlx::query_as("INSERT INTO conversations DEFAULT VALUES RETURNING id")
53 .fetch_one(&self.pool)
54 .await?;
55 Ok(row.0)
56 }
57
58 pub async fn save_message(
64 &self,
65 conversation_id: ConversationId,
66 role: &str,
67 content: &str,
68 ) -> Result<MessageId, MemoryError> {
69 self.save_message_with_parts(conversation_id, role, content, "[]")
70 .await
71 }
72
73 pub async fn save_message_with_parts(
79 &self,
80 conversation_id: ConversationId,
81 role: &str,
82 content: &str,
83 parts_json: &str,
84 ) -> Result<MessageId, MemoryError> {
85 self.save_message_with_metadata(conversation_id, role, content, parts_json, true, true)
86 .await
87 }
88
89 pub async fn save_message_with_metadata(
95 &self,
96 conversation_id: ConversationId,
97 role: &str,
98 content: &str,
99 parts_json: &str,
100 agent_visible: bool,
101 user_visible: bool,
102 ) -> Result<MessageId, MemoryError> {
103 let row: (MessageId,) = sqlx::query_as(
104 "INSERT INTO messages (conversation_id, role, content, parts, agent_visible, user_visible) \
105 VALUES (?, ?, ?, ?, ?, ?) RETURNING id",
106 )
107 .bind(conversation_id)
108 .bind(role)
109 .bind(content)
110 .bind(parts_json)
111 .bind(i64::from(agent_visible))
112 .bind(i64::from(user_visible))
113 .fetch_one(&self.pool)
114 .await?;
115 Ok(row.0)
116 }
117
118 pub async fn load_history(
124 &self,
125 conversation_id: ConversationId,
126 limit: u32,
127 ) -> Result<Vec<Message>, MemoryError> {
128 let rows: Vec<(String, String, String, i64, i64)> = sqlx::query_as(
129 "SELECT role, content, parts, agent_visible, user_visible FROM (\
130 SELECT role, content, parts, agent_visible, user_visible, id FROM messages \
131 WHERE conversation_id = ? AND deleted_at IS NULL \
132 ORDER BY id DESC \
133 LIMIT ?\
134 ) ORDER BY id ASC",
135 )
136 .bind(conversation_id)
137 .bind(limit)
138 .fetch_all(&self.pool)
139 .await?;
140
141 let messages = rows
142 .into_iter()
143 .map(
144 |(role_str, content, parts_json, agent_visible, user_visible)| {
145 let parts: Vec<MessagePart> = if parts_json == "[]" {
146 vec![]
147 } else {
148 serde_json::from_str(&parts_json).unwrap_or_default()
149 };
150 Message {
151 role: parse_role(&role_str),
152 content,
153 parts,
154 metadata: MessageMetadata {
155 agent_visible: agent_visible != 0,
156 user_visible: user_visible != 0,
157 compacted_at: None,
158 },
159 }
160 },
161 )
162 .collect();
163 Ok(messages)
164 }
165
166 pub async fn load_history_filtered(
174 &self,
175 conversation_id: ConversationId,
176 limit: u32,
177 agent_visible: Option<bool>,
178 user_visible: Option<bool>,
179 ) -> Result<Vec<Message>, MemoryError> {
180 let av = agent_visible.map(i64::from);
181 let uv = user_visible.map(i64::from);
182
183 let rows: Vec<(String, String, String, i64, i64)> = sqlx::query_as(
184 "WITH recent AS (\
185 SELECT role, content, parts, agent_visible, user_visible, id FROM messages \
186 WHERE conversation_id = ? \
187 AND deleted_at IS NULL \
188 AND (? IS NULL OR agent_visible = ?) \
189 AND (? IS NULL OR user_visible = ?) \
190 ORDER BY id DESC \
191 LIMIT ?\
192 ) SELECT role, content, parts, agent_visible, user_visible FROM recent ORDER BY id ASC",
193 )
194 .bind(conversation_id)
195 .bind(av)
196 .bind(av)
197 .bind(uv)
198 .bind(uv)
199 .bind(limit)
200 .fetch_all(&self.pool)
201 .await?;
202
203 let messages = rows
204 .into_iter()
205 .map(
206 |(role_str, content, parts_json, agent_visible, user_visible)| {
207 let parts: Vec<MessagePart> = if parts_json == "[]" {
208 vec![]
209 } else {
210 serde_json::from_str(&parts_json).unwrap_or_default()
211 };
212 Message {
213 role: parse_role(&role_str),
214 content,
215 parts,
216 metadata: MessageMetadata {
217 agent_visible: agent_visible != 0,
218 user_visible: user_visible != 0,
219 compacted_at: None,
220 },
221 }
222 },
223 )
224 .collect();
225 Ok(messages)
226 }
227
228 pub async fn replace_conversation(
240 &self,
241 conversation_id: ConversationId,
242 compacted_range: std::ops::RangeInclusive<MessageId>,
243 summary_role: &str,
244 summary_content: &str,
245 ) -> Result<MessageId, MemoryError> {
246 let now = {
247 let secs = std::time::SystemTime::now()
248 .duration_since(std::time::UNIX_EPOCH)
249 .unwrap_or_default()
250 .as_secs();
251 format!("{secs}")
252 };
253 let start_id = compacted_range.start().0;
254 let end_id = compacted_range.end().0;
255
256 let mut tx = self.pool.begin().await?;
257
258 sqlx::query(
259 "UPDATE messages SET agent_visible = 0, compacted_at = ? \
260 WHERE conversation_id = ? AND id >= ? AND id <= ?",
261 )
262 .bind(&now)
263 .bind(conversation_id)
264 .bind(start_id)
265 .bind(end_id)
266 .execute(&mut *tx)
267 .await?;
268
269 let row: (MessageId,) = sqlx::query_as(
270 "INSERT INTO messages \
271 (conversation_id, role, content, parts, agent_visible, user_visible) \
272 VALUES (?, ?, ?, '[]', 1, 0) RETURNING id",
273 )
274 .bind(conversation_id)
275 .bind(summary_role)
276 .bind(summary_content)
277 .fetch_one(&mut *tx)
278 .await?;
279
280 tx.commit().await?;
281
282 Ok(row.0)
283 }
284
285 pub async fn oldest_message_ids(
291 &self,
292 conversation_id: ConversationId,
293 n: u32,
294 ) -> Result<Vec<MessageId>, MemoryError> {
295 let rows: Vec<(MessageId,)> = sqlx::query_as(
296 "SELECT id FROM messages WHERE conversation_id = ? AND deleted_at IS NULL ORDER BY id ASC LIMIT ?",
297 )
298 .bind(conversation_id)
299 .bind(n)
300 .fetch_all(&self.pool)
301 .await?;
302 Ok(rows.into_iter().map(|r| r.0).collect())
303 }
304
305 pub async fn latest_conversation_id(&self) -> Result<Option<ConversationId>, MemoryError> {
311 let row: Option<(ConversationId,)> =
312 sqlx::query_as("SELECT id FROM conversations ORDER BY id DESC LIMIT 1")
313 .fetch_optional(&self.pool)
314 .await?;
315 Ok(row.map(|r| r.0))
316 }
317
318 pub async fn message_by_id(
324 &self,
325 message_id: MessageId,
326 ) -> Result<Option<Message>, MemoryError> {
327 let row: Option<(String, String, String, i64, i64)> = sqlx::query_as(
328 "SELECT role, content, parts, agent_visible, user_visible FROM messages WHERE id = ? AND deleted_at IS NULL",
329 )
330 .bind(message_id)
331 .fetch_optional(&self.pool)
332 .await?;
333
334 Ok(row.map(
335 |(role_str, content, parts_json, agent_visible, user_visible)| {
336 let parts: Vec<MessagePart> = if parts_json == "[]" {
337 vec![]
338 } else {
339 serde_json::from_str(&parts_json).unwrap_or_default()
340 };
341 Message {
342 role: parse_role(&role_str),
343 content,
344 parts,
345 metadata: MessageMetadata {
346 agent_visible: agent_visible != 0,
347 user_visible: user_visible != 0,
348 compacted_at: None,
349 },
350 }
351 },
352 ))
353 }
354
355 pub async fn messages_by_ids(
361 &self,
362 ids: &[MessageId],
363 ) -> Result<Vec<(MessageId, Message)>, MemoryError> {
364 if ids.is_empty() {
365 return Ok(Vec::new());
366 }
367
368 let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
369
370 let query = format!(
371 "SELECT id, role, content, parts FROM messages \
372 WHERE id IN ({placeholders}) AND agent_visible = 1 AND deleted_at IS NULL"
373 );
374 let mut q = sqlx::query_as::<_, (MessageId, String, String, String)>(&query);
375 for &id in ids {
376 q = q.bind(id);
377 }
378
379 let rows = q.fetch_all(&self.pool).await?;
380
381 Ok(rows
382 .into_iter()
383 .map(|(id, role_str, content, parts_json)| {
384 let parts: Vec<MessagePart> = if parts_json == "[]" {
385 vec![]
386 } else {
387 serde_json::from_str(&parts_json).unwrap_or_default()
388 };
389 (
390 id,
391 Message {
392 role: parse_role(&role_str),
393 content,
394 parts,
395 metadata: MessageMetadata::default(),
396 },
397 )
398 })
399 .collect())
400 }
401
402 pub async fn unembedded_message_ids(
408 &self,
409 limit: Option<usize>,
410 ) -> Result<Vec<(MessageId, ConversationId, String, String)>, MemoryError> {
411 let effective_limit = limit.map_or(i64::MAX, |l| i64::try_from(l).unwrap_or(i64::MAX));
412
413 let rows: Vec<(MessageId, ConversationId, String, String)> = sqlx::query_as(
414 "SELECT m.id, m.conversation_id, m.role, m.content \
415 FROM messages m \
416 LEFT JOIN embeddings_metadata em ON m.id = em.message_id \
417 WHERE em.id IS NULL AND m.deleted_at IS NULL \
418 ORDER BY m.id ASC \
419 LIMIT ?",
420 )
421 .bind(effective_limit)
422 .fetch_all(&self.pool)
423 .await?;
424
425 Ok(rows)
426 }
427
428 pub async fn count_messages(
434 &self,
435 conversation_id: ConversationId,
436 ) -> Result<i64, MemoryError> {
437 let row: (i64,) = sqlx::query_as(
438 "SELECT COUNT(*) FROM messages WHERE conversation_id = ? AND deleted_at IS NULL",
439 )
440 .bind(conversation_id)
441 .fetch_one(&self.pool)
442 .await?;
443 Ok(row.0)
444 }
445
446 pub async fn count_messages_after(
452 &self,
453 conversation_id: ConversationId,
454 after_id: MessageId,
455 ) -> Result<i64, MemoryError> {
456 let row: (i64,) =
457 sqlx::query_as(
458 "SELECT COUNT(*) FROM messages WHERE conversation_id = ? AND id > ? AND deleted_at IS NULL",
459 )
460 .bind(conversation_id)
461 .bind(after_id)
462 .fetch_one(&self.pool)
463 .await?;
464 Ok(row.0)
465 }
466
467 pub async fn keyword_search(
476 &self,
477 query: &str,
478 limit: usize,
479 conversation_id: Option<ConversationId>,
480 ) -> Result<Vec<(MessageId, f64)>, MemoryError> {
481 let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
482 let safe_query = sanitize_fts5_query(query);
483 if safe_query.is_empty() {
484 return Ok(Vec::new());
485 }
486
487 let rows: Vec<(MessageId, f64)> = if let Some(cid) = conversation_id {
488 sqlx::query_as(
489 "SELECT m.id, -rank AS score \
490 FROM messages_fts f \
491 JOIN messages m ON m.id = f.rowid \
492 WHERE messages_fts MATCH ? AND m.conversation_id = ? AND m.agent_visible = 1 AND m.deleted_at IS NULL \
493 ORDER BY rank \
494 LIMIT ?",
495 )
496 .bind(&safe_query)
497 .bind(cid)
498 .bind(effective_limit)
499 .fetch_all(&self.pool)
500 .await?
501 } else {
502 sqlx::query_as(
503 "SELECT m.id, -rank AS score \
504 FROM messages_fts f \
505 JOIN messages m ON m.id = f.rowid \
506 WHERE messages_fts MATCH ? AND m.agent_visible = 1 AND m.deleted_at IS NULL \
507 ORDER BY rank \
508 LIMIT ?",
509 )
510 .bind(&safe_query)
511 .bind(effective_limit)
512 .fetch_all(&self.pool)
513 .await?
514 };
515
516 Ok(rows)
517 }
518
519 pub async fn message_timestamps(
527 &self,
528 ids: &[MessageId],
529 ) -> Result<std::collections::HashMap<MessageId, i64>, MemoryError> {
530 if ids.is_empty() {
531 return Ok(std::collections::HashMap::new());
532 }
533
534 let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
535 let query = format!(
536 "SELECT id, COALESCE(CAST(strftime('%s', created_at) AS INTEGER), 0) \
537 FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
538 );
539 let mut q = sqlx::query_as::<_, (MessageId, i64)>(&query);
540 for &id in ids {
541 q = q.bind(id);
542 }
543
544 let rows = q.fetch_all(&self.pool).await?;
545 Ok(rows.into_iter().collect())
546 }
547
548 pub async fn load_messages_range(
554 &self,
555 conversation_id: ConversationId,
556 after_message_id: MessageId,
557 limit: usize,
558 ) -> Result<Vec<(MessageId, String, String)>, MemoryError> {
559 let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
560
561 let rows: Vec<(MessageId, String, String)> = sqlx::query_as(
562 "SELECT id, role, content FROM messages \
563 WHERE conversation_id = ? AND id > ? AND deleted_at IS NULL \
564 ORDER BY id ASC LIMIT ?",
565 )
566 .bind(conversation_id)
567 .bind(after_message_id)
568 .bind(effective_limit)
569 .fetch_all(&self.pool)
570 .await?;
571
572 Ok(rows)
573 }
574
575 pub async fn get_eviction_candidates(
583 &self,
584 ) -> Result<Vec<crate::eviction::EvictionEntry>, crate::error::MemoryError> {
585 let rows: Vec<(MessageId, String, Option<String>, i64)> = sqlx::query_as(
586 "SELECT id, created_at, last_accessed, access_count \
587 FROM messages WHERE deleted_at IS NULL",
588 )
589 .fetch_all(&self.pool)
590 .await?;
591
592 Ok(rows
593 .into_iter()
594 .map(
595 |(id, created_at, last_accessed, access_count)| crate::eviction::EvictionEntry {
596 id,
597 created_at,
598 last_accessed,
599 access_count: access_count.try_into().unwrap_or(0),
600 },
601 )
602 .collect())
603 }
604
605 pub async fn soft_delete_messages(
613 &self,
614 ids: &[MessageId],
615 ) -> Result<(), crate::error::MemoryError> {
616 if ids.is_empty() {
617 return Ok(());
618 }
619 for &id in ids {
621 sqlx::query(
622 "UPDATE messages SET deleted_at = datetime('now') WHERE id = ? AND deleted_at IS NULL",
623 )
624 .bind(id)
625 .execute(&self.pool)
626 .await?;
627 }
628 Ok(())
629 }
630
631 pub async fn get_soft_deleted_message_ids(
637 &self,
638 ) -> Result<Vec<MessageId>, crate::error::MemoryError> {
639 let rows: Vec<(MessageId,)> = sqlx::query_as(
640 "SELECT id FROM messages WHERE deleted_at IS NOT NULL AND qdrant_cleaned = 0",
641 )
642 .fetch_all(&self.pool)
643 .await?;
644 Ok(rows.into_iter().map(|(id,)| id).collect())
645 }
646
647 pub async fn mark_qdrant_cleaned(
653 &self,
654 ids: &[MessageId],
655 ) -> Result<(), crate::error::MemoryError> {
656 for &id in ids {
657 sqlx::query("UPDATE messages SET qdrant_cleaned = 1 WHERE id = ?")
658 .bind(id)
659 .execute(&self.pool)
660 .await?;
661 }
662 Ok(())
663 }
664}
665
666#[cfg(test)]
667mod tests {
668 use super::*;
669
670 async fn test_store() -> SqliteStore {
671 SqliteStore::new(":memory:").await.unwrap()
672 }
673
674 #[tokio::test]
675 async fn create_conversation_returns_id() {
676 let store = test_store().await;
677 let id1 = store.create_conversation().await.unwrap();
678 let id2 = store.create_conversation().await.unwrap();
679 assert_eq!(id1, ConversationId(1));
680 assert_eq!(id2, ConversationId(2));
681 }
682
683 #[tokio::test]
684 async fn save_and_load_messages() {
685 let store = test_store().await;
686 let cid = store.create_conversation().await.unwrap();
687
688 let msg_id1 = store.save_message(cid, "user", "hello").await.unwrap();
689 let msg_id2 = store
690 .save_message(cid, "assistant", "hi there")
691 .await
692 .unwrap();
693
694 assert_eq!(msg_id1, MessageId(1));
695 assert_eq!(msg_id2, MessageId(2));
696
697 let history = store.load_history(cid, 50).await.unwrap();
698 assert_eq!(history.len(), 2);
699 assert_eq!(history[0].role, Role::User);
700 assert_eq!(history[0].content, "hello");
701 assert_eq!(history[1].role, Role::Assistant);
702 assert_eq!(history[1].content, "hi there");
703 }
704
705 #[tokio::test]
706 async fn load_history_respects_limit() {
707 let store = test_store().await;
708 let cid = store.create_conversation().await.unwrap();
709
710 for i in 0..10 {
711 store
712 .save_message(cid, "user", &format!("msg {i}"))
713 .await
714 .unwrap();
715 }
716
717 let history = store.load_history(cid, 3).await.unwrap();
718 assert_eq!(history.len(), 3);
719 assert_eq!(history[0].content, "msg 7");
720 assert_eq!(history[1].content, "msg 8");
721 assert_eq!(history[2].content, "msg 9");
722 }
723
724 #[tokio::test]
725 async fn latest_conversation_id_empty() {
726 let store = test_store().await;
727 assert!(store.latest_conversation_id().await.unwrap().is_none());
728 }
729
730 #[tokio::test]
731 async fn latest_conversation_id_returns_newest() {
732 let store = test_store().await;
733 store.create_conversation().await.unwrap();
734 let id2 = store.create_conversation().await.unwrap();
735 assert_eq!(store.latest_conversation_id().await.unwrap(), Some(id2));
736 }
737
738 #[tokio::test]
739 async fn messages_isolated_per_conversation() {
740 let store = test_store().await;
741 let cid1 = store.create_conversation().await.unwrap();
742 let cid2 = store.create_conversation().await.unwrap();
743
744 store.save_message(cid1, "user", "conv1").await.unwrap();
745 store.save_message(cid2, "user", "conv2").await.unwrap();
746
747 let h1 = store.load_history(cid1, 50).await.unwrap();
748 let h2 = store.load_history(cid2, 50).await.unwrap();
749 assert_eq!(h1.len(), 1);
750 assert_eq!(h1[0].content, "conv1");
751 assert_eq!(h2.len(), 1);
752 assert_eq!(h2[0].content, "conv2");
753 }
754
755 #[tokio::test]
756 async fn pool_accessor_returns_valid_pool() {
757 let store = test_store().await;
758 let pool = store.pool();
759 let row: (i64,) = sqlx::query_as("SELECT 1").fetch_one(pool).await.unwrap();
760 assert_eq!(row.0, 1);
761 }
762
763 #[tokio::test]
764 async fn embeddings_metadata_table_exists() {
765 let store = test_store().await;
766 let result: (i64,) = sqlx::query_as(
767 "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='embeddings_metadata'",
768 )
769 .fetch_one(store.pool())
770 .await
771 .unwrap();
772 assert_eq!(result.0, 1);
773 }
774
775 #[tokio::test]
776 async fn cascade_delete_removes_embeddings_metadata() {
777 let store = test_store().await;
778 let pool = store.pool();
779
780 let cid = store.create_conversation().await.unwrap();
781 let msg_id = store.save_message(cid, "user", "test").await.unwrap();
782
783 let point_id = uuid::Uuid::new_v4().to_string();
784 sqlx::query(
785 "INSERT INTO embeddings_metadata (message_id, qdrant_point_id, dimensions) \
786 VALUES (?, ?, ?)",
787 )
788 .bind(msg_id)
789 .bind(&point_id)
790 .bind(768_i64)
791 .execute(pool)
792 .await
793 .unwrap();
794
795 let before: (i64,) =
796 sqlx::query_as("SELECT COUNT(*) FROM embeddings_metadata WHERE message_id = ?")
797 .bind(msg_id)
798 .fetch_one(pool)
799 .await
800 .unwrap();
801 assert_eq!(before.0, 1);
802
803 sqlx::query("DELETE FROM messages WHERE id = ?")
804 .bind(msg_id)
805 .execute(pool)
806 .await
807 .unwrap();
808
809 let after: (i64,) =
810 sqlx::query_as("SELECT COUNT(*) FROM embeddings_metadata WHERE message_id = ?")
811 .bind(msg_id)
812 .fetch_one(pool)
813 .await
814 .unwrap();
815 assert_eq!(after.0, 0);
816 }
817
818 #[tokio::test]
819 async fn messages_by_ids_batch_fetch() {
820 let store = test_store().await;
821 let cid = store.create_conversation().await.unwrap();
822 let id1 = store.save_message(cid, "user", "hello").await.unwrap();
823 let id2 = store.save_message(cid, "assistant", "hi").await.unwrap();
824 let _id3 = store.save_message(cid, "user", "bye").await.unwrap();
825
826 let results = store.messages_by_ids(&[id1, id2]).await.unwrap();
827 assert_eq!(results.len(), 2);
828 assert_eq!(results[0].0, id1);
829 assert_eq!(results[0].1.content, "hello");
830 assert_eq!(results[1].0, id2);
831 assert_eq!(results[1].1.content, "hi");
832 }
833
834 #[tokio::test]
835 async fn messages_by_ids_empty_input() {
836 let store = test_store().await;
837 let results = store.messages_by_ids(&[]).await.unwrap();
838 assert!(results.is_empty());
839 }
840
841 #[tokio::test]
842 async fn messages_by_ids_nonexistent() {
843 let store = test_store().await;
844 let results = store
845 .messages_by_ids(&[MessageId(999), MessageId(1000)])
846 .await
847 .unwrap();
848 assert!(results.is_empty());
849 }
850
851 #[tokio::test]
852 async fn message_by_id_fetches_existing() {
853 let store = test_store().await;
854 let cid = store.create_conversation().await.unwrap();
855 let msg_id = store.save_message(cid, "user", "hello").await.unwrap();
856
857 let msg = store.message_by_id(msg_id).await.unwrap();
858 assert!(msg.is_some());
859 let msg = msg.unwrap();
860 assert_eq!(msg.role, Role::User);
861 assert_eq!(msg.content, "hello");
862 }
863
864 #[tokio::test]
865 async fn message_by_id_returns_none_for_nonexistent() {
866 let store = test_store().await;
867 let msg = store.message_by_id(MessageId(999)).await.unwrap();
868 assert!(msg.is_none());
869 }
870
871 #[tokio::test]
872 async fn unembedded_message_ids_returns_all_when_none_embedded() {
873 let store = test_store().await;
874 let cid = store.create_conversation().await.unwrap();
875
876 store.save_message(cid, "user", "msg1").await.unwrap();
877 store.save_message(cid, "assistant", "msg2").await.unwrap();
878
879 let unembedded = store.unembedded_message_ids(None).await.unwrap();
880 assert_eq!(unembedded.len(), 2);
881 assert_eq!(unembedded[0].3, "msg1");
882 assert_eq!(unembedded[1].3, "msg2");
883 }
884
885 #[tokio::test]
886 async fn unembedded_message_ids_excludes_embedded() {
887 let store = test_store().await;
888 let pool = store.pool();
889 let cid = store.create_conversation().await.unwrap();
890
891 let msg_id1 = store.save_message(cid, "user", "msg1").await.unwrap();
892 let msg_id2 = store.save_message(cid, "assistant", "msg2").await.unwrap();
893
894 let point_id = uuid::Uuid::new_v4().to_string();
895 sqlx::query(
896 "INSERT INTO embeddings_metadata (message_id, qdrant_point_id, dimensions) \
897 VALUES (?, ?, ?)",
898 )
899 .bind(msg_id1)
900 .bind(&point_id)
901 .bind(768_i64)
902 .execute(pool)
903 .await
904 .unwrap();
905
906 let unembedded = store.unembedded_message_ids(None).await.unwrap();
907 assert_eq!(unembedded.len(), 1);
908 assert_eq!(unembedded[0].0, msg_id2);
909 assert_eq!(unembedded[0].3, "msg2");
910 }
911
912 #[tokio::test]
913 async fn unembedded_message_ids_respects_limit() {
914 let store = test_store().await;
915 let cid = store.create_conversation().await.unwrap();
916
917 for i in 0..10 {
918 store
919 .save_message(cid, "user", &format!("msg{i}"))
920 .await
921 .unwrap();
922 }
923
924 let unembedded = store.unembedded_message_ids(Some(3)).await.unwrap();
925 assert_eq!(unembedded.len(), 3);
926 }
927
928 #[tokio::test]
929 async fn count_messages_returns_correct_count() {
930 let store = test_store().await;
931 let cid = store.create_conversation().await.unwrap();
932
933 assert_eq!(store.count_messages(cid).await.unwrap(), 0);
934
935 store.save_message(cid, "user", "msg1").await.unwrap();
936 store.save_message(cid, "assistant", "msg2").await.unwrap();
937
938 assert_eq!(store.count_messages(cid).await.unwrap(), 2);
939 }
940
941 #[tokio::test]
942 async fn count_messages_after_filters_correctly() {
943 let store = test_store().await;
944 let cid = store.create_conversation().await.unwrap();
945
946 let id1 = store.save_message(cid, "user", "msg1").await.unwrap();
947 let _id2 = store.save_message(cid, "assistant", "msg2").await.unwrap();
948 let id3 = store.save_message(cid, "user", "msg3").await.unwrap();
949
950 assert_eq!(
951 store.count_messages_after(cid, MessageId(0)).await.unwrap(),
952 3
953 );
954 assert_eq!(store.count_messages_after(cid, id1).await.unwrap(), 2);
955 assert_eq!(store.count_messages_after(cid, id3).await.unwrap(), 0);
956 }
957
958 #[tokio::test]
959 async fn load_messages_range_basic() {
960 let store = test_store().await;
961 let cid = store.create_conversation().await.unwrap();
962
963 let msg_id1 = store.save_message(cid, "user", "msg1").await.unwrap();
964 let msg_id2 = store.save_message(cid, "assistant", "msg2").await.unwrap();
965 let msg_id3 = store.save_message(cid, "user", "msg3").await.unwrap();
966
967 let msgs = store.load_messages_range(cid, msg_id1, 10).await.unwrap();
968 assert_eq!(msgs.len(), 2);
969 assert_eq!(msgs[0].0, msg_id2);
970 assert_eq!(msgs[0].2, "msg2");
971 assert_eq!(msgs[1].0, msg_id3);
972 assert_eq!(msgs[1].2, "msg3");
973 }
974
975 #[tokio::test]
976 async fn load_messages_range_respects_limit() {
977 let store = test_store().await;
978 let cid = store.create_conversation().await.unwrap();
979
980 store.save_message(cid, "user", "msg1").await.unwrap();
981 store.save_message(cid, "assistant", "msg2").await.unwrap();
982 store.save_message(cid, "user", "msg3").await.unwrap();
983
984 let msgs = store
985 .load_messages_range(cid, MessageId(0), 2)
986 .await
987 .unwrap();
988 assert_eq!(msgs.len(), 2);
989 }
990
991 #[tokio::test]
992 async fn keyword_search_basic() {
993 let store = test_store().await;
994 let cid = store.create_conversation().await.unwrap();
995
996 store
997 .save_message(cid, "user", "rust programming language")
998 .await
999 .unwrap();
1000 store
1001 .save_message(cid, "assistant", "python is great too")
1002 .await
1003 .unwrap();
1004 store
1005 .save_message(cid, "user", "I love rust and cargo")
1006 .await
1007 .unwrap();
1008
1009 let results = store.keyword_search("rust", 10, None).await.unwrap();
1010 assert_eq!(results.len(), 2);
1011 assert!(results.iter().all(|(_, score)| *score > 0.0));
1012 }
1013
1014 #[tokio::test]
1015 async fn keyword_search_with_conversation_filter() {
1016 let store = test_store().await;
1017 let cid1 = store.create_conversation().await.unwrap();
1018 let cid2 = store.create_conversation().await.unwrap();
1019
1020 store
1021 .save_message(cid1, "user", "hello world")
1022 .await
1023 .unwrap();
1024 store
1025 .save_message(cid2, "user", "hello universe")
1026 .await
1027 .unwrap();
1028
1029 let results = store.keyword_search("hello", 10, Some(cid1)).await.unwrap();
1030 assert_eq!(results.len(), 1);
1031 }
1032
1033 #[tokio::test]
1034 async fn keyword_search_no_match() {
1035 let store = test_store().await;
1036 let cid = store.create_conversation().await.unwrap();
1037
1038 store
1039 .save_message(cid, "user", "hello world")
1040 .await
1041 .unwrap();
1042
1043 let results = store.keyword_search("nonexistent", 10, None).await.unwrap();
1044 assert!(results.is_empty());
1045 }
1046
1047 #[tokio::test]
1048 async fn keyword_search_respects_limit() {
1049 let store = test_store().await;
1050 let cid = store.create_conversation().await.unwrap();
1051
1052 for i in 0..10 {
1053 store
1054 .save_message(cid, "user", &format!("test message {i}"))
1055 .await
1056 .unwrap();
1057 }
1058
1059 let results = store.keyword_search("test", 3, None).await.unwrap();
1060 assert_eq!(results.len(), 3);
1061 }
1062
1063 #[test]
1064 fn sanitize_fts5_query_strips_special_chars() {
1065 assert_eq!(sanitize_fts5_query("skill-audit"), "skill audit");
1066 assert_eq!(sanitize_fts5_query("hello, world"), "hello world");
1067 assert_eq!(sanitize_fts5_query("a+b*c^d"), "a b c d");
1068 assert_eq!(sanitize_fts5_query(" "), "");
1069 assert_eq!(sanitize_fts5_query("rust programming"), "rust programming");
1070 }
1071
1072 #[tokio::test]
1073 async fn keyword_search_with_special_chars_does_not_error() {
1074 let store = test_store().await;
1075 let cid = store.create_conversation().await.unwrap();
1076 store
1077 .save_message(cid, "user", "skill audit info")
1078 .await
1079 .unwrap();
1080 store
1083 .keyword_search("skill-audit, confidence=0.1", 10, None)
1084 .await
1085 .unwrap();
1086 }
1087
1088 #[tokio::test]
1089 async fn save_message_with_metadata_stores_visibility() {
1090 let store = test_store().await;
1091 let cid = store.create_conversation().await.unwrap();
1092
1093 let id = store
1094 .save_message_with_metadata(cid, "user", "hello", "[]", false, true)
1095 .await
1096 .unwrap();
1097
1098 let history = store.load_history(cid, 10).await.unwrap();
1099 assert_eq!(history.len(), 1);
1100 assert!(!history[0].metadata.agent_visible);
1101 assert!(history[0].metadata.user_visible);
1102 assert_eq!(id, MessageId(1));
1103 }
1104
1105 #[tokio::test]
1106 async fn load_history_filtered_by_agent_visible() {
1107 let store = test_store().await;
1108 let cid = store.create_conversation().await.unwrap();
1109
1110 store
1111 .save_message_with_metadata(cid, "user", "visible to agent", "[]", true, true)
1112 .await
1113 .unwrap();
1114 store
1115 .save_message_with_metadata(cid, "user", "user only", "[]", false, true)
1116 .await
1117 .unwrap();
1118
1119 let agent_msgs = store
1120 .load_history_filtered(cid, 50, Some(true), None)
1121 .await
1122 .unwrap();
1123 assert_eq!(agent_msgs.len(), 1);
1124 assert_eq!(agent_msgs[0].content, "visible to agent");
1125 }
1126
1127 #[tokio::test]
1128 async fn load_history_filtered_by_user_visible() {
1129 let store = test_store().await;
1130 let cid = store.create_conversation().await.unwrap();
1131
1132 store
1133 .save_message_with_metadata(cid, "system", "agent only summary", "[]", true, false)
1134 .await
1135 .unwrap();
1136 store
1137 .save_message_with_metadata(cid, "user", "user sees this", "[]", true, true)
1138 .await
1139 .unwrap();
1140
1141 let user_msgs = store
1142 .load_history_filtered(cid, 50, None, Some(true))
1143 .await
1144 .unwrap();
1145 assert_eq!(user_msgs.len(), 1);
1146 assert_eq!(user_msgs[0].content, "user sees this");
1147 }
1148
1149 #[tokio::test]
1150 async fn load_history_filtered_no_filter_returns_all() {
1151 let store = test_store().await;
1152 let cid = store.create_conversation().await.unwrap();
1153
1154 store
1155 .save_message_with_metadata(cid, "user", "msg1", "[]", true, false)
1156 .await
1157 .unwrap();
1158 store
1159 .save_message_with_metadata(cid, "user", "msg2", "[]", false, true)
1160 .await
1161 .unwrap();
1162
1163 let all_msgs = store
1164 .load_history_filtered(cid, 50, None, None)
1165 .await
1166 .unwrap();
1167 assert_eq!(all_msgs.len(), 2);
1168 }
1169
1170 #[tokio::test]
1171 async fn replace_conversation_marks_originals_and_inserts_summary() {
1172 let store = test_store().await;
1173 let cid = store.create_conversation().await.unwrap();
1174
1175 let id1 = store.save_message(cid, "user", "first").await.unwrap();
1176 let id2 = store
1177 .save_message(cid, "assistant", "second")
1178 .await
1179 .unwrap();
1180 let id3 = store.save_message(cid, "user", "third").await.unwrap();
1181
1182 let summary_id = store
1183 .replace_conversation(cid, id1..=id2, "system", "summary text")
1184 .await
1185 .unwrap();
1186
1187 let all = store.load_history(cid, 50).await.unwrap();
1189 let by_id1 = all.iter().find(|m| m.content == "first").unwrap();
1191 assert!(!by_id1.metadata.agent_visible);
1192 assert!(by_id1.metadata.user_visible);
1193
1194 let by_id2 = all.iter().find(|m| m.content == "second").unwrap();
1195 assert!(!by_id2.metadata.agent_visible);
1196
1197 let by_id3 = all.iter().find(|m| m.content == "third").unwrap();
1198 assert!(by_id3.metadata.agent_visible);
1199
1200 let summary = all.iter().find(|m| m.content == "summary text").unwrap();
1202 assert!(summary.metadata.agent_visible);
1203 assert!(!summary.metadata.user_visible);
1204 assert!(summary_id > id3);
1205 }
1206
1207 #[tokio::test]
1208 async fn oldest_message_ids_returns_in_order() {
1209 let store = test_store().await;
1210 let cid = store.create_conversation().await.unwrap();
1211
1212 let id1 = store.save_message(cid, "user", "a").await.unwrap();
1213 let id2 = store.save_message(cid, "assistant", "b").await.unwrap();
1214 let id3 = store.save_message(cid, "user", "c").await.unwrap();
1215
1216 let ids = store.oldest_message_ids(cid, 2).await.unwrap();
1217 assert_eq!(ids, vec![id1, id2]);
1218 assert!(ids[0] < ids[1]);
1219
1220 let all_ids = store.oldest_message_ids(cid, 10).await.unwrap();
1221 assert_eq!(all_ids, vec![id1, id2, id3]);
1222 }
1223
1224 #[tokio::test]
1225 async fn message_metadata_default_both_visible() {
1226 let store = test_store().await;
1227 let cid = store.create_conversation().await.unwrap();
1228
1229 store.save_message(cid, "user", "normal").await.unwrap();
1230
1231 let history = store.load_history(cid, 10).await.unwrap();
1232 assert!(history[0].metadata.agent_visible);
1233 assert!(history[0].metadata.user_visible);
1234 assert!(history[0].metadata.compacted_at.is_none());
1235 }
1236
1237 #[tokio::test]
1238 async fn load_history_empty_parts_json_fast_path() {
1239 let store = test_store().await;
1240 let cid = store.create_conversation().await.unwrap();
1241
1242 store
1243 .save_message_with_parts(cid, "user", "hello", "[]")
1244 .await
1245 .unwrap();
1246
1247 let history = store.load_history(cid, 10).await.unwrap();
1248 assert_eq!(history.len(), 1);
1249 assert!(
1250 history[0].parts.is_empty(),
1251 "\"[]\" fast-path must yield empty parts Vec"
1252 );
1253 }
1254
1255 #[tokio::test]
1256 async fn load_history_non_empty_parts_json_parsed() {
1257 let store = test_store().await;
1258 let cid = store.create_conversation().await.unwrap();
1259
1260 let parts_json = serde_json::to_string(&vec![MessagePart::ToolResult {
1261 tool_use_id: "t1".into(),
1262 content: "result".into(),
1263 is_error: false,
1264 }])
1265 .unwrap();
1266
1267 store
1268 .save_message_with_parts(cid, "user", "hello", &parts_json)
1269 .await
1270 .unwrap();
1271
1272 let history = store.load_history(cid, 10).await.unwrap();
1273 assert_eq!(history.len(), 1);
1274 assert_eq!(history[0].parts.len(), 1);
1275 assert!(
1276 matches!(&history[0].parts[0], MessagePart::ToolResult { content, .. } if content == "result")
1277 );
1278 }
1279
1280 #[tokio::test]
1281 async fn message_by_id_empty_parts_json_fast_path() {
1282 let store = test_store().await;
1283 let cid = store.create_conversation().await.unwrap();
1284
1285 let id = store
1286 .save_message_with_parts(cid, "user", "msg", "[]")
1287 .await
1288 .unwrap();
1289
1290 let msg = store.message_by_id(id).await.unwrap().unwrap();
1291 assert!(
1292 msg.parts.is_empty(),
1293 "\"[]\" fast-path must yield empty parts Vec in message_by_id"
1294 );
1295 }
1296
1297 #[tokio::test]
1298 async fn messages_by_ids_empty_parts_json_fast_path() {
1299 let store = test_store().await;
1300 let cid = store.create_conversation().await.unwrap();
1301
1302 let id = store
1303 .save_message_with_parts(cid, "user", "msg", "[]")
1304 .await
1305 .unwrap();
1306
1307 let results = store.messages_by_ids(&[id]).await.unwrap();
1308 assert_eq!(results.len(), 1);
1309 assert!(
1310 results[0].1.parts.is_empty(),
1311 "\"[]\" fast-path must yield empty parts Vec in messages_by_ids"
1312 );
1313 }
1314
1315 #[tokio::test]
1316 async fn load_history_filtered_empty_parts_json_fast_path() {
1317 let store = test_store().await;
1318 let cid = store.create_conversation().await.unwrap();
1319
1320 store
1321 .save_message_with_metadata(cid, "user", "msg", "[]", true, true)
1322 .await
1323 .unwrap();
1324
1325 let msgs = store
1326 .load_history_filtered(cid, 10, Some(true), None)
1327 .await
1328 .unwrap();
1329 assert_eq!(msgs.len(), 1);
1330 assert!(
1331 msgs[0].parts.is_empty(),
1332 "\"[]\" fast-path must yield empty parts Vec in load_history_filtered"
1333 );
1334 }
1335}