kernex_memory/store/
messages.rs1use super::Store;
4use kernex_core::{
5 error::KernexError,
6 message::{Request, Response},
7};
8use uuid::Uuid;
9
10impl Store {
11 pub async fn store_exchange(
16 &self,
17 channel: &str,
18 incoming: &Request,
19 response: &Response,
20 project: &str,
21 ) -> Result<(), KernexError> {
22 let conv_id = self
23 .get_or_create_conversation(channel, &incoming.sender_id, project)
24 .await?;
25
26 let user_id = Uuid::new_v4().to_string();
28 sqlx::query(
29 "INSERT INTO messages (id, conversation_id, role, content) VALUES (?, ?, 'user', ?)",
30 )
31 .bind(&user_id)
32 .bind(&conv_id)
33 .bind(&incoming.text)
34 .execute(&self.pool)
35 .await
36 .map_err(|e| KernexError::Store(format!("insert failed: {e}")))?;
37
38 let asst_id = Uuid::new_v4().to_string();
40 let metadata_json = serde_json::to_string(&response.metadata)
41 .map_err(|e| KernexError::Store(format!("serialize failed: {e}")))?;
42
43 sqlx::query(
44 "INSERT INTO messages (id, conversation_id, role, content, metadata_json) VALUES (?, ?, 'assistant', ?, ?)",
45 )
46 .bind(&asst_id)
47 .bind(&conv_id)
48 .bind(&response.text)
49 .bind(&metadata_json)
50 .execute(&self.pool)
51 .await
52 .map_err(|e| KernexError::Store(format!("insert failed: {e}")))?;
53
54 Ok(())
55 }
56
57 pub async fn search_messages(
59 &self,
60 query: &str,
61 exclude_conversation_id: &str,
62 sender_id: &str,
63 limit: i64,
64 ) -> Result<Vec<(String, String, String)>, KernexError> {
65 if query.len() < 3 {
66 return Ok(Vec::new());
67 }
68
69 let sanitized = format!("\"{}\"", query.replace('"', "\"\""));
72
73 let rows: Vec<(String, String, String)> = sqlx::query_as(
74 "SELECT m.role, m.content, m.timestamp \
75 FROM messages_fts fts \
76 JOIN messages m ON m.rowid = fts.rowid \
77 JOIN conversations c ON c.id = m.conversation_id \
78 WHERE messages_fts MATCH ? \
79 AND m.conversation_id != ? \
80 AND c.sender_id = ? \
81 ORDER BY rank \
82 LIMIT ?",
83 )
84 .bind(&sanitized)
85 .bind(exclude_conversation_id)
86 .bind(sender_id)
87 .bind(limit)
88 .fetch_all(&self.pool)
89 .await
90 .map_err(|e| KernexError::Store(format!("fts search failed: {e}")))?;
91
92 Ok(rows)
93 }
94}