Skip to main content

kernex_memory/store/
conversations.rs

1//! Conversation lifecycle — create, find, close, summaries, history, stats.
2
3use super::{Store, CONVERSATION_TIMEOUT_MINUTES};
4use kernex_core::error::KernexError;
5use uuid::Uuid;
6
7impl Store {
8    /// Get or create an active conversation for a given channel + sender + project.
9    ///
10    /// Only returns conversations that are `active` AND have `last_activity`
11    /// within the timeout window. Otherwise creates a new one.
12    pub(crate) async fn get_or_create_conversation(
13        &self,
14        channel: &str,
15        sender_id: &str,
16        project: &str,
17    ) -> Result<String, KernexError> {
18        let row: Option<(String,)> = sqlx::query_as(
19            "SELECT id FROM conversations \
20             WHERE channel = ? AND sender_id = ? AND project = ? AND status = 'active' \
21             AND datetime(last_activity) > datetime('now', ? || ' minutes') \
22             ORDER BY last_activity DESC LIMIT 1",
23        )
24        .bind(channel)
25        .bind(sender_id)
26        .bind(project)
27        .bind(-CONVERSATION_TIMEOUT_MINUTES)
28        .fetch_optional(&self.pool)
29        .await
30        .map_err(|e| KernexError::Store(format!("query failed: {e}")))?;
31
32        if let Some((id,)) = row {
33            sqlx::query(
34                "UPDATE conversations SET last_activity = datetime('now'), updated_at = datetime('now') WHERE id = ?",
35            )
36            .bind(&id)
37            .execute(&self.pool)
38            .await
39            .map_err(|e| KernexError::Store(format!("update failed: {e}")))?;
40            return Ok(id);
41        }
42
43        let id = Uuid::new_v4().to_string();
44        sqlx::query(
45            "INSERT INTO conversations (id, channel, sender_id, project, status, last_activity) \
46             VALUES (?, ?, ?, ?, 'active', datetime('now'))",
47        )
48        .bind(&id)
49        .bind(channel)
50        .bind(sender_id)
51        .bind(project)
52        .execute(&self.pool)
53        .await
54        .map_err(|e| KernexError::Store(format!("insert failed: {e}")))?;
55
56        Ok(id)
57    }
58
59    /// Find active conversations that have been idle beyond the timeout.
60    pub async fn find_idle_conversations(
61        &self,
62    ) -> Result<Vec<(String, String, String, String)>, KernexError> {
63        let rows: Vec<(String, String, String, String)> = sqlx::query_as(
64            "SELECT id, channel, sender_id, project FROM conversations \
65             WHERE status = 'active' \
66             AND datetime(last_activity) <= datetime('now', ? || ' minutes')",
67        )
68        .bind(-CONVERSATION_TIMEOUT_MINUTES)
69        .fetch_all(&self.pool)
70        .await
71        .map_err(|e| KernexError::Store(format!("query failed: {e}")))?;
72
73        Ok(rows)
74    }
75
76    /// Find all active conversations (for shutdown).
77    pub async fn find_all_active_conversations(
78        &self,
79    ) -> Result<Vec<(String, String, String, String)>, KernexError> {
80        let rows: Vec<(String, String, String, String)> = sqlx::query_as(
81            "SELECT id, channel, sender_id, project FROM conversations WHERE status = 'active'",
82        )
83        .fetch_all(&self.pool)
84        .await
85        .map_err(|e| KernexError::Store(format!("query failed: {e}")))?;
86
87        Ok(rows)
88    }
89
90    /// Get all messages for a conversation (for summarization).
91    pub async fn get_conversation_messages(
92        &self,
93        conversation_id: &str,
94    ) -> Result<Vec<(String, String)>, KernexError> {
95        let rows: Vec<(String, String)> = sqlx::query_as(
96            "SELECT role, content FROM messages \
97             WHERE conversation_id = ? ORDER BY timestamp ASC",
98        )
99        .bind(conversation_id)
100        .fetch_all(&self.pool)
101        .await
102        .map_err(|e| KernexError::Store(format!("query failed: {e}")))?;
103
104        Ok(rows)
105    }
106
107    /// Close a conversation with a summary.
108    pub async fn close_conversation(
109        &self,
110        conversation_id: &str,
111        summary: &str,
112    ) -> Result<(), KernexError> {
113        sqlx::query(
114            "UPDATE conversations SET status = 'closed', summary = ?, updated_at = datetime('now') WHERE id = ?",
115        )
116        .bind(summary)
117        .bind(conversation_id)
118        .execute(&self.pool)
119        .await
120        .map_err(|e| KernexError::Store(format!("update failed: {e}")))?;
121
122        Ok(())
123    }
124
125    /// Close the current active conversation for a sender + project.
126    pub async fn close_current_conversation(
127        &self,
128        channel: &str,
129        sender_id: &str,
130        project: &str,
131    ) -> Result<bool, KernexError> {
132        let result = sqlx::query(
133            "UPDATE conversations SET status = 'closed', updated_at = datetime('now') \
134             WHERE channel = ? AND sender_id = ? AND project = ? AND status = 'active'",
135        )
136        .bind(channel)
137        .bind(sender_id)
138        .bind(project)
139        .execute(&self.pool)
140        .await
141        .map_err(|e| KernexError::Store(format!("update failed: {e}")))?;
142
143        Ok(result.rows_affected() > 0)
144    }
145
146    /// Get recent closed conversation summaries for a sender.
147    pub async fn get_recent_summaries(
148        &self,
149        channel: &str,
150        sender_id: &str,
151        limit: i64,
152    ) -> Result<Vec<(String, String)>, KernexError> {
153        let rows: Vec<(String, String)> = sqlx::query_as(
154            "SELECT summary, updated_at FROM conversations \
155             WHERE channel = ? AND sender_id = ? AND status = 'closed' AND summary IS NOT NULL \
156             ORDER BY updated_at DESC LIMIT ?",
157        )
158        .bind(channel)
159        .bind(sender_id)
160        .bind(limit)
161        .fetch_all(&self.pool)
162        .await
163        .map_err(|e| KernexError::Store(format!("query failed: {e}")))?;
164
165        Ok(rows)
166    }
167
168    /// Get recent conversation summaries across all users.
169    pub async fn get_all_recent_summaries(
170        &self,
171        limit: i64,
172    ) -> Result<Vec<(String, String)>, KernexError> {
173        let rows: Vec<(String, String)> = sqlx::query_as(
174            "SELECT summary, updated_at FROM conversations \
175             WHERE status = 'closed' AND summary IS NOT NULL \
176             ORDER BY updated_at DESC LIMIT ?",
177        )
178        .bind(limit)
179        .fetch_all(&self.pool)
180        .await
181        .map_err(|e| KernexError::Store(format!("query failed: {e}")))?;
182
183        Ok(rows)
184    }
185
186    /// Get conversation history (summaries with timestamps) for a sender.
187    pub async fn get_history(
188        &self,
189        channel: &str,
190        sender_id: &str,
191        limit: i64,
192    ) -> Result<Vec<(String, String)>, KernexError> {
193        let rows: Vec<(String, String)> = sqlx::query_as(
194            "SELECT COALESCE(summary, '(no summary)'), updated_at FROM conversations \
195             WHERE channel = ? AND sender_id = ? AND status = 'closed' \
196             ORDER BY updated_at DESC LIMIT ?",
197        )
198        .bind(channel)
199        .bind(sender_id)
200        .bind(limit)
201        .fetch_all(&self.pool)
202        .await
203        .map_err(|e| KernexError::Store(format!("query failed: {e}")))?;
204
205        Ok(rows)
206    }
207
208    /// Get memory statistics for a sender.
209    pub async fn get_memory_stats(&self, sender_id: &str) -> Result<(i64, i64, i64), KernexError> {
210        let (conv_count,): (i64,) =
211            sqlx::query_as("SELECT COUNT(*) FROM conversations WHERE sender_id = ?")
212                .bind(sender_id)
213                .fetch_one(&self.pool)
214                .await
215                .map_err(|e| KernexError::Store(format!("query failed: {e}")))?;
216
217        let (msg_count,): (i64,) = sqlx::query_as(
218            "SELECT COUNT(*) FROM messages m \
219             JOIN conversations c ON m.conversation_id = c.id \
220             WHERE c.sender_id = ?",
221        )
222        .bind(sender_id)
223        .fetch_one(&self.pool)
224        .await
225        .map_err(|e| KernexError::Store(format!("query failed: {e}")))?;
226
227        let (fact_count,): (i64,) =
228            sqlx::query_as("SELECT COUNT(*) FROM facts WHERE sender_id = ?")
229                .bind(sender_id)
230                .fetch_one(&self.pool)
231                .await
232                .map_err(|e| KernexError::Store(format!("query failed: {e}")))?;
233
234        Ok((conv_count, msg_count, fact_count))
235    }
236}