Skip to main content

kernex_memory/store/
messages.rs

1//! Message storage and full-text search.
2
3use super::Store;
4use crate::error::MemoryError;
5use kernex_core::message::{Request, Response};
6use uuid::Uuid;
7
8impl Store {
9    /// Store a user message and assistant response.
10    ///
11    /// The `channel` parameter identifies the communication channel (e.g. "api",
12    /// "slack") since `Request` is channel-agnostic.
13    pub async fn store_exchange(
14        &self,
15        channel: &str,
16        incoming: &Request,
17        response: &Response,
18        project: &str,
19    ) -> Result<(), MemoryError> {
20        let conv_id = self
21            .get_or_create_conversation(channel, &incoming.sender_id, project)
22            .await?;
23
24        // Store user message.
25        let user_id = Uuid::new_v4().to_string();
26        sqlx::query(
27            "INSERT INTO messages (id, conversation_id, role, content) VALUES (?, ?, 'user', ?)",
28        )
29        .bind(&user_id)
30        .bind(&conv_id)
31        .bind(&incoming.text)
32        .execute(&self.pool)
33        .await
34        .map_err(|e| MemoryError::sqlite("insert failed", e))?;
35
36        // Store assistant response.
37        let asst_id = Uuid::new_v4().to_string();
38        let metadata_json = serde_json::to_string(&response.metadata)
39            .map_err(|e| MemoryError::serde("serialize failed", e))?;
40
41        sqlx::query(
42            "INSERT INTO messages (id, conversation_id, role, content, metadata_json) VALUES (?, ?, 'assistant', ?, ?)",
43        )
44        .bind(&asst_id)
45        .bind(&conv_id)
46        .bind(&response.text)
47        .bind(&metadata_json)
48        .execute(&self.pool)
49        .await
50        .map_err(|e| MemoryError::sqlite("insert failed", e))?;
51
52        Ok(())
53    }
54
55    /// Search past messages across all conversations using FTS5 full-text search.
56    pub async fn search_messages(
57        &self,
58        query: &str,
59        exclude_conversation_id: &str,
60        sender_id: &str,
61        limit: i64,
62    ) -> Result<Vec<(String, String, String)>, MemoryError> {
63        if query.len() < 3 {
64            return Ok(Vec::new());
65        }
66
67        // Wrap in double quotes and escape internal quotes to prevent FTS5 operator
68        // injection (AND, OR, NOT, NEAR, *, etc.).
69        let sanitized = format!("\"{}\"", query.replace('"', "\"\""));
70
71        let rows: Vec<(String, String, String)> = sqlx::query_as(
72            "SELECT m.role, m.content, m.timestamp \
73             FROM messages_fts fts \
74             JOIN messages m ON m.rowid = fts.rowid \
75             JOIN conversations c ON c.id = m.conversation_id \
76             WHERE messages_fts MATCH ? \
77             AND m.conversation_id != ? \
78             AND c.sender_id = ? \
79             ORDER BY rank \
80             LIMIT ?",
81        )
82        .bind(&sanitized)
83        .bind(exclude_conversation_id)
84        .bind(sender_id)
85        .bind(limit)
86        .fetch_all(&self.pool)
87        .await
88        .map_err(|e| MemoryError::sqlite("fts search failed", e))?;
89
90        Ok(rows)
91    }
92}