use std::time::SystemTime;
use super::Store;
use crate::error::MemoryError;
use crate::types::{format_sqlite_timestamp, parse_sqlite_timestamp, MessageRow};
use kernex_core::message::{Request, Response};
use uuid::Uuid;
impl Store {
pub async fn store_exchange(
&self,
channel: &str,
incoming: &Request,
response: &Response,
project: &str,
) -> Result<(), MemoryError> {
let conv_id = self
.get_or_create_conversation(channel, &incoming.sender_id, project)
.await?;
let user_id = Uuid::new_v4().to_string();
sqlx::query(
"INSERT INTO messages (id, conversation_id, role, content) VALUES (?, ?, 'user', ?)",
)
.bind(&user_id)
.bind(&conv_id)
.bind(&incoming.text)
.execute(&self.pool)
.await
.map_err(|e| MemoryError::sqlite("insert failed", e))?;
let asst_id = Uuid::new_v4().to_string();
let metadata_json = serde_json::to_string(&response.metadata)
.map_err(|e| MemoryError::serde("serialize failed", e))?;
sqlx::query(
"INSERT INTO messages (id, conversation_id, role, content, metadata_json) VALUES (?, ?, 'assistant', ?, ?)",
)
.bind(&asst_id)
.bind(&conv_id)
.bind(&response.text)
.bind(&metadata_json)
.execute(&self.pool)
.await
.map_err(|e| MemoryError::sqlite("insert failed", e))?;
Ok(())
}
pub async fn get_message_by_id(&self, id: &str) -> Result<Option<MessageRow>, MemoryError> {
let row: Option<(String, String, String, String, String)> = sqlx::query_as(
"SELECT id, conversation_id, role, content, timestamp \
FROM messages WHERE id = ?",
)
.bind(id)
.fetch_optional(&self.pool)
.await
.map_err(|e| MemoryError::sqlite("get_message_by_id failed", e))?;
match row {
Some((id, conversation_id, role, content, timestamp)) => Ok(Some(MessageRow {
id,
conversation_id,
role,
content,
timestamp: parse_sqlite_timestamp(×tamp)?,
})),
None => Ok(None),
}
}
pub async fn search_messages(
&self,
query: &str,
exclude_conversation_id: &str,
sender_id: &str,
limit: i64,
since: Option<SystemTime>,
) -> Result<Vec<MessageRow>, MemoryError> {
if query.len() < 3 {
return Ok(Vec::new());
}
let sanitized = format!("\"{}\"", query.replace('"', "\"\""));
let rows: Vec<(String, String, String, String, String)> = if let Some(cutoff) = since {
let cutoff_str = format_sqlite_timestamp(cutoff);
sqlx::query_as(
"SELECT m.id, m.conversation_id, m.role, m.content, m.timestamp \
FROM messages_fts fts \
JOIN messages m ON m.rowid = fts.rowid \
JOIN conversations c ON c.id = m.conversation_id \
WHERE messages_fts MATCH ? \
AND m.conversation_id != ? \
AND c.sender_id = ? \
AND m.timestamp >= ? \
ORDER BY rank \
LIMIT ?",
)
.bind(&sanitized)
.bind(exclude_conversation_id)
.bind(sender_id)
.bind(&cutoff_str)
.bind(limit)
.fetch_all(&self.pool)
.await
} else {
sqlx::query_as(
"SELECT m.id, m.conversation_id, m.role, m.content, m.timestamp \
FROM messages_fts fts \
JOIN messages m ON m.rowid = fts.rowid \
JOIN conversations c ON c.id = m.conversation_id \
WHERE messages_fts MATCH ? \
AND m.conversation_id != ? \
AND c.sender_id = ? \
ORDER BY rank \
LIMIT ?",
)
.bind(&sanitized)
.bind(exclude_conversation_id)
.bind(sender_id)
.bind(limit)
.fetch_all(&self.pool)
.await
}
.map_err(|e| MemoryError::sqlite("fts search failed", e))?;
let mut out = Vec::with_capacity(rows.len());
for (id, conversation_id, role, content, timestamp) in rows {
out.push(MessageRow {
id,
conversation_id,
role,
content,
timestamp: parse_sqlite_timestamp(×tamp)?,
});
}
Ok(out)
}
}