Skip to main content

kernex_memory/store/
messages.rs

1//! Message storage and full-text search.
2
3use super::Store;
4use kernex_core::{
5    error::KernexError,
6    message::{Request, Response},
7};
8use uuid::Uuid;
9
10impl Store {
11    /// Store a user message and assistant response.
12    ///
13    /// The `channel` parameter identifies the communication channel (e.g. "api",
14    /// "slack") since `Request` is channel-agnostic.
15    pub async fn store_exchange(
16        &self,
17        channel: &str,
18        incoming: &Request,
19        response: &Response,
20        project: &str,
21    ) -> Result<(), KernexError> {
22        let conv_id = self
23            .get_or_create_conversation(channel, &incoming.sender_id, project)
24            .await?;
25
26        // Store user message.
27        let user_id = Uuid::new_v4().to_string();
28        sqlx::query(
29            "INSERT INTO messages (id, conversation_id, role, content) VALUES (?, ?, 'user', ?)",
30        )
31        .bind(&user_id)
32        .bind(&conv_id)
33        .bind(&incoming.text)
34        .execute(&self.pool)
35        .await
36        .map_err(|e| KernexError::Store(format!("insert failed: {e}")))?;
37
38        // Store assistant response.
39        let asst_id = Uuid::new_v4().to_string();
40        let metadata_json = serde_json::to_string(&response.metadata)
41            .map_err(|e| KernexError::Store(format!("serialize failed: {e}")))?;
42
43        sqlx::query(
44            "INSERT INTO messages (id, conversation_id, role, content, metadata_json) VALUES (?, ?, 'assistant', ?, ?)",
45        )
46        .bind(&asst_id)
47        .bind(&conv_id)
48        .bind(&response.text)
49        .bind(&metadata_json)
50        .execute(&self.pool)
51        .await
52        .map_err(|e| KernexError::Store(format!("insert failed: {e}")))?;
53
54        Ok(())
55    }
56
57    /// Search past messages across all conversations using FTS5 full-text search.
58    pub async fn search_messages(
59        &self,
60        query: &str,
61        exclude_conversation_id: &str,
62        sender_id: &str,
63        limit: i64,
64    ) -> Result<Vec<(String, String, String)>, KernexError> {
65        if query.len() < 3 {
66            return Ok(Vec::new());
67        }
68
69        // Wrap in double quotes and escape internal quotes to prevent FTS5 operator
70        // injection (AND, OR, NOT, NEAR, *, etc.).
71        let sanitized = format!("\"{}\"", query.replace('"', "\"\""));
72
73        let rows: Vec<(String, String, String)> = sqlx::query_as(
74            "SELECT m.role, m.content, m.timestamp \
75             FROM messages_fts fts \
76             JOIN messages m ON m.rowid = fts.rowid \
77             JOIN conversations c ON c.id = m.conversation_id \
78             WHERE messages_fts MATCH ? \
79             AND m.conversation_id != ? \
80             AND c.sender_id = ? \
81             ORDER BY rank \
82             LIMIT ?",
83        )
84        .bind(&sanitized)
85        .bind(exclude_conversation_id)
86        .bind(sender_id)
87        .bind(limit)
88        .fetch_all(&self.pool)
89        .await
90        .map_err(|e| KernexError::Store(format!("fts search failed: {e}")))?;
91
92        Ok(rows)
93    }
94}