Skip to main content

kernex_memory/store/
messages.rs

1//! Message storage and full-text search.
2
3use std::time::SystemTime;
4
5use super::Store;
6use crate::error::MemoryError;
7use crate::types::{format_sqlite_timestamp, parse_sqlite_timestamp, MessageRow};
8use kernex_core::message::{Request, Response};
9use uuid::Uuid;
10
11impl Store {
12    /// Store a user message and assistant response.
13    ///
14    /// The `channel` parameter identifies the communication channel (e.g. "api",
15    /// "slack") since `Request` is channel-agnostic.
16    pub async fn store_exchange(
17        &self,
18        channel: &str,
19        incoming: &Request,
20        response: &Response,
21        project: &str,
22    ) -> Result<(), MemoryError> {
23        let conv_id = self
24            .get_or_create_conversation(channel, &incoming.sender_id, project)
25            .await?;
26
27        // Store user message.
28        let user_id = Uuid::new_v4().to_string();
29        sqlx::query(
30            "INSERT INTO messages (id, conversation_id, role, content) VALUES (?, ?, 'user', ?)",
31        )
32        .bind(&user_id)
33        .bind(&conv_id)
34        .bind(&incoming.text)
35        .execute(&self.pool)
36        .await
37        .map_err(|e| MemoryError::sqlite("insert failed", e))?;
38
39        // Store assistant response.
40        let asst_id = Uuid::new_v4().to_string();
41        let metadata_json = serde_json::to_string(&response.metadata)
42            .map_err(|e| MemoryError::serde("serialize failed", e))?;
43
44        sqlx::query(
45            "INSERT INTO messages (id, conversation_id, role, content, metadata_json) VALUES (?, ?, 'assistant', ?, ?)",
46        )
47        .bind(&asst_id)
48        .bind(&conv_id)
49        .bind(&response.text)
50        .bind(&metadata_json)
51        .execute(&self.pool)
52        .await
53        .map_err(|e| MemoryError::sqlite("insert failed", e))?;
54
55        Ok(())
56    }
57
58    /// Fetch a single message row by its UUID. Returns `None` when the
59    /// id is missing. The `MemoryStore` trait method
60    /// `get_message_by_id` delegates here.
61    pub async fn get_message_by_id(&self, id: &str) -> Result<Option<MessageRow>, MemoryError> {
62        let row: Option<(String, String, String, String, String)> = sqlx::query_as(
63            "SELECT id, conversation_id, role, content, timestamp \
64             FROM messages WHERE id = ?",
65        )
66        .bind(id)
67        .fetch_optional(&self.pool)
68        .await
69        .map_err(|e| MemoryError::sqlite("get_message_by_id failed", e))?;
70
71        match row {
72            Some((id, conversation_id, role, content, timestamp)) => Ok(Some(MessageRow {
73                id,
74                conversation_id,
75                role,
76                content,
77                timestamp: parse_sqlite_timestamp(&timestamp)?,
78            })),
79            None => Ok(None),
80        }
81    }
82
83    /// Search past messages across all conversations using FTS5 full-text search.
84    /// Honors an optional `since` recency cutoff: when `Some`, only
85    /// messages with `timestamp >= since` are returned, and `limit`
86    /// applies after the filter (resolves the S-search-2 ambiguity
87    /// flagged in the kx-mem-cli-promotion spec).
88    pub async fn search_messages(
89        &self,
90        query: &str,
91        exclude_conversation_id: &str,
92        sender_id: &str,
93        limit: i64,
94        since: Option<SystemTime>,
95    ) -> Result<Vec<MessageRow>, MemoryError> {
96        if query.len() < 3 {
97            return Ok(Vec::new());
98        }
99
100        // Wrap in double quotes and escape internal quotes to prevent FTS5 operator
101        // injection (AND, OR, NOT, NEAR, *, etc.).
102        let sanitized = format!("\"{}\"", query.replace('"', "\"\""));
103
104        // Build the SQL with an optional `since` predicate. Using a
105        // branch instead of a single conditional SQL string keeps the
106        // prepared statement shape stable for sqlx's cache.
107        let rows: Vec<(String, String, String, String, String)> = if let Some(cutoff) = since {
108            let cutoff_str = format_sqlite_timestamp(cutoff);
109            sqlx::query_as(
110                "SELECT m.id, m.conversation_id, m.role, m.content, m.timestamp \
111                 FROM messages_fts fts \
112                 JOIN messages m ON m.rowid = fts.rowid \
113                 JOIN conversations c ON c.id = m.conversation_id \
114                 WHERE messages_fts MATCH ? \
115                 AND m.conversation_id != ? \
116                 AND c.sender_id = ? \
117                 AND m.timestamp >= ? \
118                 ORDER BY rank \
119                 LIMIT ?",
120            )
121            .bind(&sanitized)
122            .bind(exclude_conversation_id)
123            .bind(sender_id)
124            .bind(&cutoff_str)
125            .bind(limit)
126            .fetch_all(&self.pool)
127            .await
128        } else {
129            sqlx::query_as(
130                "SELECT m.id, m.conversation_id, m.role, m.content, m.timestamp \
131                 FROM messages_fts fts \
132                 JOIN messages m ON m.rowid = fts.rowid \
133                 JOIN conversations c ON c.id = m.conversation_id \
134                 WHERE messages_fts MATCH ? \
135                 AND m.conversation_id != ? \
136                 AND c.sender_id = ? \
137                 ORDER BY rank \
138                 LIMIT ?",
139            )
140            .bind(&sanitized)
141            .bind(exclude_conversation_id)
142            .bind(sender_id)
143            .bind(limit)
144            .fetch_all(&self.pool)
145            .await
146        }
147        .map_err(|e| MemoryError::sqlite("fts search failed", e))?;
148
149        let mut out = Vec::with_capacity(rows.len());
150        for (id, conversation_id, role, content, timestamp) in rows {
151            out.push(MessageRow {
152                id,
153                conversation_id,
154                role,
155                content,
156                timestamp: parse_sqlite_timestamp(&timestamp)?,
157            });
158        }
159        Ok(out)
160    }
161}