Skip to main content

kernex_memory/store/
conversations.rs

1//! Conversation lifecycle — create, find, close, summaries, history, stats.
2
3use super::{Store, CONVERSATION_TIMEOUT_MINUTES};
4use crate::error::MemoryError;
5use uuid::Uuid;
6
7impl Store {
8    /// Get or create an active conversation for a given channel + sender + project.
9    ///
10    /// Only returns conversations that are `active` AND have `last_activity`
11    /// within the timeout window. Otherwise creates a new one.
12    pub(crate) async fn get_or_create_conversation(
13        &self,
14        channel: &str,
15        sender_id: &str,
16        project: &str,
17    ) -> Result<String, MemoryError> {
18        let row: Option<(String,)> = sqlx::query_as(
19            "SELECT id FROM conversations \
20             WHERE channel = ? AND sender_id = ? AND project = ? AND status = 'active' \
21             AND datetime(last_activity) > datetime('now', ? || ' minutes') \
22             ORDER BY last_activity DESC LIMIT 1",
23        )
24        .bind(channel)
25        .bind(sender_id)
26        .bind(project)
27        .bind(-CONVERSATION_TIMEOUT_MINUTES)
28        .fetch_optional(&self.pool)
29        .await
30        .map_err(|e| MemoryError::sqlite("query failed", e))?;
31
32        if let Some((id,)) = row {
33            sqlx::query(
34                "UPDATE conversations SET last_activity = datetime('now'), updated_at = datetime('now') WHERE id = ?",
35            )
36            .bind(&id)
37            .execute(&self.pool)
38            .await
39            .map_err(|e| MemoryError::sqlite("update failed", e))?;
40            return Ok(id);
41        }
42
43        let id = Uuid::new_v4().to_string();
44        sqlx::query(
45            "INSERT INTO conversations (id, channel, sender_id, project, status, last_activity) \
46             VALUES (?, ?, ?, ?, 'active', datetime('now'))",
47        )
48        .bind(&id)
49        .bind(channel)
50        .bind(sender_id)
51        .bind(project)
52        .execute(&self.pool)
53        .await
54        .map_err(|e| MemoryError::sqlite("insert failed", e))?;
55
56        Ok(id)
57    }
58
59    /// Find active conversations that have been idle beyond the timeout.
60    pub async fn find_idle_conversations(
61        &self,
62    ) -> Result<Vec<(String, String, String, String)>, MemoryError> {
63        let rows: Vec<(String, String, String, String)> = sqlx::query_as(
64            "SELECT id, channel, sender_id, project FROM conversations \
65             WHERE status = 'active' \
66             AND datetime(last_activity) <= datetime('now', ? || ' minutes')",
67        )
68        .bind(-CONVERSATION_TIMEOUT_MINUTES)
69        .fetch_all(&self.pool)
70        .await
71        .map_err(|e| MemoryError::sqlite("query failed", e))?;
72
73        Ok(rows)
74    }
75
76    /// Find all active conversations (for shutdown).
77    pub async fn find_all_active_conversations(
78        &self,
79    ) -> Result<Vec<(String, String, String, String)>, MemoryError> {
80        let rows: Vec<(String, String, String, String)> = sqlx::query_as(
81            "SELECT id, channel, sender_id, project FROM conversations WHERE status = 'active'",
82        )
83        .fetch_all(&self.pool)
84        .await
85        .map_err(|e| MemoryError::sqlite("query failed", e))?;
86
87        Ok(rows)
88    }
89
90    /// Get all messages for a conversation (for summarization).
91    pub async fn get_conversation_messages(
92        &self,
93        conversation_id: &str,
94    ) -> Result<Vec<(String, String)>, MemoryError> {
95        let rows: Vec<(String, String)> = sqlx::query_as(
96            "SELECT role, content FROM messages \
97             WHERE conversation_id = ? ORDER BY timestamp ASC",
98        )
99        .bind(conversation_id)
100        .fetch_all(&self.pool)
101        .await
102        .map_err(|e| MemoryError::sqlite("query failed", e))?;
103
104        Ok(rows)
105    }
106
107    /// Close a conversation with a summary.
108    pub async fn close_conversation(
109        &self,
110        conversation_id: &str,
111        summary: &str,
112    ) -> Result<(), MemoryError> {
113        sqlx::query(
114            "UPDATE conversations SET status = 'closed', summary = ?, updated_at = datetime('now') WHERE id = ?",
115        )
116        .bind(summary)
117        .bind(conversation_id)
118        .execute(&self.pool)
119        .await
120        .map_err(|e| MemoryError::sqlite("update failed", e))?;
121
122        Ok(())
123    }
124
125    /// Close the current active conversation for a sender + project.
126    pub async fn close_current_conversation(
127        &self,
128        channel: &str,
129        sender_id: &str,
130        project: &str,
131    ) -> Result<bool, MemoryError> {
132        let result = sqlx::query(
133            "UPDATE conversations SET status = 'closed', updated_at = datetime('now') \
134             WHERE channel = ? AND sender_id = ? AND project = ? AND status = 'active'",
135        )
136        .bind(channel)
137        .bind(sender_id)
138        .bind(project)
139        .execute(&self.pool)
140        .await
141        .map_err(|e| MemoryError::sqlite("update failed", e))?;
142
143        Ok(result.rows_affected() > 0)
144    }
145
146    /// Get recent closed conversation summaries for a sender.
147    pub async fn get_recent_summaries(
148        &self,
149        channel: &str,
150        sender_id: &str,
151        limit: i64,
152    ) -> Result<Vec<(String, String)>, MemoryError> {
153        let rows: Vec<(String, String)> = sqlx::query_as(
154            "SELECT summary, updated_at FROM conversations \
155             WHERE channel = ? AND sender_id = ? AND status = 'closed' AND summary IS NOT NULL \
156             ORDER BY updated_at DESC LIMIT ?",
157        )
158        .bind(channel)
159        .bind(sender_id)
160        .bind(limit)
161        .fetch_all(&self.pool)
162        .await
163        .map_err(|e| MemoryError::sqlite("query failed", e))?;
164
165        Ok(rows)
166    }
167
168    /// Get recent conversation summaries across all users.
169    pub async fn get_all_recent_summaries(
170        &self,
171        limit: i64,
172    ) -> Result<Vec<(String, String)>, MemoryError> {
173        let rows: Vec<(String, String)> = sqlx::query_as(
174            "SELECT summary, updated_at FROM conversations \
175             WHERE status = 'closed' AND summary IS NOT NULL \
176             ORDER BY updated_at DESC LIMIT ?",
177        )
178        .bind(limit)
179        .fetch_all(&self.pool)
180        .await
181        .map_err(|e| MemoryError::sqlite("query failed", e))?;
182
183        Ok(rows)
184    }
185
186    /// Get conversation history (summaries with timestamps) for a sender.
187    /// Surfaces `conversation_id` alongside the summary so consumers can
188    /// dispatch follow-up reads (`get_message_by_id`,
189    /// `get_recent_summaries`) by id rather than by string match.
190    pub async fn get_history(
191        &self,
192        channel: &str,
193        sender_id: &str,
194        limit: i64,
195    ) -> Result<Vec<crate::types::HistoryRow>, MemoryError> {
196        let rows: Vec<(String, String, String)> = sqlx::query_as(
197            "SELECT id, COALESCE(summary, '(no summary)'), updated_at FROM conversations \
198             WHERE channel = ? AND sender_id = ? AND status = 'closed' \
199             ORDER BY updated_at DESC LIMIT ?",
200        )
201        .bind(channel)
202        .bind(sender_id)
203        .bind(limit)
204        .fetch_all(&self.pool)
205        .await
206        .map_err(|e| MemoryError::sqlite("query failed", e))?;
207
208        let mut out = Vec::with_capacity(rows.len());
209        for (conversation_id, summary, updated_at) in rows {
210            out.push(crate::types::HistoryRow {
211                conversation_id,
212                summary,
213                updated_at: crate::types::parse_sqlite_timestamp(&updated_at)?,
214            });
215        }
216        Ok(out)
217    }
218
219    /// Get memory statistics for a sender.
220    pub async fn get_memory_stats(&self, sender_id: &str) -> Result<(i64, i64, i64), MemoryError> {
221        let (conv_count,): (i64,) =
222            sqlx::query_as("SELECT COUNT(*) FROM conversations WHERE sender_id = ?")
223                .bind(sender_id)
224                .fetch_one(&self.pool)
225                .await
226                .map_err(|e| MemoryError::sqlite("query failed", e))?;
227
228        let (msg_count,): (i64,) = sqlx::query_as(
229            "SELECT COUNT(*) FROM messages m \
230             JOIN conversations c ON m.conversation_id = c.id \
231             WHERE c.sender_id = ?",
232        )
233        .bind(sender_id)
234        .fetch_one(&self.pool)
235        .await
236        .map_err(|e| MemoryError::sqlite("query failed", e))?;
237
238        let (fact_count,): (i64,) = sqlx::query_as(
239            "SELECT COUNT(*) FROM facts \
240             WHERE sender_id = ? AND deleted_at IS NULL",
241        )
242        .bind(sender_id)
243        .fetch_one(&self.pool)
244        .await
245        .map_err(|e| MemoryError::sqlite("query failed", e))?;
246
247        Ok((conv_count, msg_count, fact_count))
248    }
249}