1use std::time::SystemTime;
4
5use super::Store;
6use crate::error::MemoryError;
7use crate::types::{format_sqlite_timestamp, parse_sqlite_timestamp, MessageRow};
8use kernex_core::message::{Request, Response};
9use uuid::Uuid;
10
11impl Store {
12 pub async fn store_exchange(
17 &self,
18 channel: &str,
19 incoming: &Request,
20 response: &Response,
21 project: &str,
22 ) -> Result<(), MemoryError> {
23 let conv_id = self
24 .get_or_create_conversation(channel, &incoming.sender_id, project)
25 .await?;
26
27 let user_id = Uuid::new_v4().to_string();
29 sqlx::query(
30 "INSERT INTO messages (id, conversation_id, role, content) VALUES (?, ?, 'user', ?)",
31 )
32 .bind(&user_id)
33 .bind(&conv_id)
34 .bind(&incoming.text)
35 .execute(&self.pool)
36 .await
37 .map_err(|e| MemoryError::sqlite("insert failed", e))?;
38
39 let asst_id = Uuid::new_v4().to_string();
41 let metadata_json = serde_json::to_string(&response.metadata)
42 .map_err(|e| MemoryError::serde("serialize failed", e))?;
43
44 sqlx::query(
45 "INSERT INTO messages (id, conversation_id, role, content, metadata_json) VALUES (?, ?, 'assistant', ?, ?)",
46 )
47 .bind(&asst_id)
48 .bind(&conv_id)
49 .bind(&response.text)
50 .bind(&metadata_json)
51 .execute(&self.pool)
52 .await
53 .map_err(|e| MemoryError::sqlite("insert failed", e))?;
54
55 Ok(())
56 }
57
58 pub async fn get_message_by_id(&self, id: &str) -> Result<Option<MessageRow>, MemoryError> {
62 let row: Option<(String, String, String, String, String)> = sqlx::query_as(
63 "SELECT id, conversation_id, role, content, timestamp \
64 FROM messages WHERE id = ?",
65 )
66 .bind(id)
67 .fetch_optional(&self.pool)
68 .await
69 .map_err(|e| MemoryError::sqlite("get_message_by_id failed", e))?;
70
71 match row {
72 Some((id, conversation_id, role, content, timestamp)) => Ok(Some(MessageRow {
73 id,
74 conversation_id,
75 role,
76 content,
77 timestamp: parse_sqlite_timestamp(×tamp)?,
78 })),
79 None => Ok(None),
80 }
81 }
82
83 pub async fn search_messages(
89 &self,
90 query: &str,
91 exclude_conversation_id: &str,
92 sender_id: &str,
93 limit: i64,
94 since: Option<SystemTime>,
95 ) -> Result<Vec<MessageRow>, MemoryError> {
96 if query.len() < 3 {
97 return Ok(Vec::new());
98 }
99
100 let sanitized = format!("\"{}\"", query.replace('"', "\"\""));
103
104 let rows: Vec<(String, String, String, String, String)> = if let Some(cutoff) = since {
108 let cutoff_str = format_sqlite_timestamp(cutoff);
109 sqlx::query_as(
110 "SELECT m.id, m.conversation_id, m.role, m.content, m.timestamp \
111 FROM messages_fts fts \
112 JOIN messages m ON m.rowid = fts.rowid \
113 JOIN conversations c ON c.id = m.conversation_id \
114 WHERE messages_fts MATCH ? \
115 AND m.conversation_id != ? \
116 AND c.sender_id = ? \
117 AND m.timestamp >= ? \
118 ORDER BY rank \
119 LIMIT ?",
120 )
121 .bind(&sanitized)
122 .bind(exclude_conversation_id)
123 .bind(sender_id)
124 .bind(&cutoff_str)
125 .bind(limit)
126 .fetch_all(&self.pool)
127 .await
128 } else {
129 sqlx::query_as(
130 "SELECT m.id, m.conversation_id, m.role, m.content, m.timestamp \
131 FROM messages_fts fts \
132 JOIN messages m ON m.rowid = fts.rowid \
133 JOIN conversations c ON c.id = m.conversation_id \
134 WHERE messages_fts MATCH ? \
135 AND m.conversation_id != ? \
136 AND c.sender_id = ? \
137 ORDER BY rank \
138 LIMIT ?",
139 )
140 .bind(&sanitized)
141 .bind(exclude_conversation_id)
142 .bind(sender_id)
143 .bind(limit)
144 .fetch_all(&self.pool)
145 .await
146 }
147 .map_err(|e| MemoryError::sqlite("fts search failed", e))?;
148
149 let mut out = Vec::with_capacity(rows.len());
150 for (id, conversation_id, role, content, timestamp) in rows {
151 out.push(MessageRow {
152 id,
153 conversation_id,
154 role,
155 content,
156 timestamp: parse_sqlite_timestamp(×tamp)?,
157 });
158 }
159 Ok(out)
160 }
161}