1use zeph_llm::provider::{Message, MessageMetadata, MessagePart, Role};
5
6use super::SqliteStore;
7use crate::error::MemoryError;
8use crate::types::{ConversationId, MessageId};
9
10pub(crate) fn sanitize_fts5_query(query: &str) -> String {
20 query
21 .split(|c: char| !c.is_alphanumeric())
22 .filter(|t| !t.is_empty())
23 .collect::<Vec<_>>()
24 .join(" ")
25}
26
27fn parse_role(s: &str) -> Role {
28 match s {
29 "assistant" => Role::Assistant,
30 "system" => Role::System,
31 _ => Role::User,
32 }
33}
34
35#[must_use]
36pub fn role_str(role: Role) -> &'static str {
37 match role {
38 Role::System => "system",
39 Role::User => "user",
40 Role::Assistant => "assistant",
41 }
42}
43
44fn parse_parts_json(role_str: &str, parts_json: &str) -> Vec<MessagePart> {
49 if parts_json == "[]" {
50 return vec![];
51 }
52 match serde_json::from_str(parts_json) {
53 Ok(p) => p,
54 Err(e) => {
55 let truncated = parts_json.chars().take(120).collect::<String>();
56 tracing::warn!(
57 role = %role_str,
58 parts_json = %truncated,
59 error = %e,
60 "failed to deserialize message parts, falling back to empty"
61 );
62 vec![]
63 }
64 }
65}
66
67impl SqliteStore {
68 pub async fn create_conversation(&self) -> Result<ConversationId, MemoryError> {
74 let row: (ConversationId,) =
75 sqlx::query_as("INSERT INTO conversations DEFAULT VALUES RETURNING id")
76 .fetch_one(&self.pool)
77 .await?;
78 Ok(row.0)
79 }
80
81 pub async fn save_message(
87 &self,
88 conversation_id: ConversationId,
89 role: &str,
90 content: &str,
91 ) -> Result<MessageId, MemoryError> {
92 self.save_message_with_parts(conversation_id, role, content, "[]")
93 .await
94 }
95
96 pub async fn save_message_with_parts(
102 &self,
103 conversation_id: ConversationId,
104 role: &str,
105 content: &str,
106 parts_json: &str,
107 ) -> Result<MessageId, MemoryError> {
108 self.save_message_with_metadata(conversation_id, role, content, parts_json, true, true)
109 .await
110 }
111
112 pub async fn save_message_with_metadata(
118 &self,
119 conversation_id: ConversationId,
120 role: &str,
121 content: &str,
122 parts_json: &str,
123 agent_visible: bool,
124 user_visible: bool,
125 ) -> Result<MessageId, MemoryError> {
126 let row: (MessageId,) = sqlx::query_as(
127 "INSERT INTO messages (conversation_id, role, content, parts, agent_visible, user_visible) \
128 VALUES (?, ?, ?, ?, ?, ?) RETURNING id",
129 )
130 .bind(conversation_id)
131 .bind(role)
132 .bind(content)
133 .bind(parts_json)
134 .bind(i64::from(agent_visible))
135 .bind(i64::from(user_visible))
136 .fetch_one(&self.pool)
137 .await?;
138 Ok(row.0)
139 }
140
141 pub async fn load_history(
147 &self,
148 conversation_id: ConversationId,
149 limit: u32,
150 ) -> Result<Vec<Message>, MemoryError> {
151 let rows: Vec<(String, String, String, i64, i64)> = sqlx::query_as(
152 "SELECT role, content, parts, agent_visible, user_visible FROM (\
153 SELECT role, content, parts, agent_visible, user_visible, id FROM messages \
154 WHERE conversation_id = ? AND deleted_at IS NULL \
155 ORDER BY id DESC \
156 LIMIT ?\
157 ) ORDER BY id ASC",
158 )
159 .bind(conversation_id)
160 .bind(limit)
161 .fetch_all(&self.pool)
162 .await?;
163
164 let messages = rows
165 .into_iter()
166 .map(
167 |(role_str, content, parts_json, agent_visible, user_visible)| {
168 let parts = parse_parts_json(&role_str, &parts_json);
169 Message {
170 role: parse_role(&role_str),
171 content,
172 parts,
173 metadata: MessageMetadata {
174 agent_visible: agent_visible != 0,
175 user_visible: user_visible != 0,
176 compacted_at: None,
177 deferred_summary: None,
178 focus_pinned: false,
179 focus_marker_id: None,
180 },
181 }
182 },
183 )
184 .collect();
185 Ok(messages)
186 }
187
188 pub async fn load_history_filtered(
196 &self,
197 conversation_id: ConversationId,
198 limit: u32,
199 agent_visible: Option<bool>,
200 user_visible: Option<bool>,
201 ) -> Result<Vec<Message>, MemoryError> {
202 let av = agent_visible.map(i64::from);
203 let uv = user_visible.map(i64::from);
204
205 let rows: Vec<(String, String, String, i64, i64)> = sqlx::query_as(
206 "WITH recent AS (\
207 SELECT role, content, parts, agent_visible, user_visible, id FROM messages \
208 WHERE conversation_id = ? \
209 AND deleted_at IS NULL \
210 AND (? IS NULL OR agent_visible = ?) \
211 AND (? IS NULL OR user_visible = ?) \
212 ORDER BY id DESC \
213 LIMIT ?\
214 ) SELECT role, content, parts, agent_visible, user_visible FROM recent ORDER BY id ASC",
215 )
216 .bind(conversation_id)
217 .bind(av)
218 .bind(av)
219 .bind(uv)
220 .bind(uv)
221 .bind(limit)
222 .fetch_all(&self.pool)
223 .await?;
224
225 let messages = rows
226 .into_iter()
227 .map(
228 |(role_str, content, parts_json, agent_visible, user_visible)| {
229 let parts = parse_parts_json(&role_str, &parts_json);
230 Message {
231 role: parse_role(&role_str),
232 content,
233 parts,
234 metadata: MessageMetadata {
235 agent_visible: agent_visible != 0,
236 user_visible: user_visible != 0,
237 compacted_at: None,
238 deferred_summary: None,
239 focus_pinned: false,
240 focus_marker_id: None,
241 },
242 }
243 },
244 )
245 .collect();
246 Ok(messages)
247 }
248
249 pub async fn replace_conversation(
261 &self,
262 conversation_id: ConversationId,
263 compacted_range: std::ops::RangeInclusive<MessageId>,
264 summary_role: &str,
265 summary_content: &str,
266 ) -> Result<MessageId, MemoryError> {
267 let now = {
268 let secs = std::time::SystemTime::now()
269 .duration_since(std::time::UNIX_EPOCH)
270 .unwrap_or_default()
271 .as_secs();
272 format!("{secs}")
273 };
274 let start_id = compacted_range.start().0;
275 let end_id = compacted_range.end().0;
276
277 let mut tx = self.pool.begin().await?;
278
279 sqlx::query(
280 "UPDATE messages SET agent_visible = 0, compacted_at = ? \
281 WHERE conversation_id = ? AND id >= ? AND id <= ?",
282 )
283 .bind(&now)
284 .bind(conversation_id)
285 .bind(start_id)
286 .bind(end_id)
287 .execute(&mut *tx)
288 .await?;
289
290 let row: (MessageId,) = sqlx::query_as(
291 "INSERT INTO messages \
292 (conversation_id, role, content, parts, agent_visible, user_visible) \
293 VALUES (?, ?, ?, '[]', 1, 0) RETURNING id",
294 )
295 .bind(conversation_id)
296 .bind(summary_role)
297 .bind(summary_content)
298 .fetch_one(&mut *tx)
299 .await?;
300
301 tx.commit().await?;
302
303 Ok(row.0)
304 }
305
306 pub async fn oldest_message_ids(
312 &self,
313 conversation_id: ConversationId,
314 n: u32,
315 ) -> Result<Vec<MessageId>, MemoryError> {
316 let rows: Vec<(MessageId,)> = sqlx::query_as(
317 "SELECT id FROM messages WHERE conversation_id = ? AND deleted_at IS NULL ORDER BY id ASC LIMIT ?",
318 )
319 .bind(conversation_id)
320 .bind(n)
321 .fetch_all(&self.pool)
322 .await?;
323 Ok(rows.into_iter().map(|r| r.0).collect())
324 }
325
326 pub async fn latest_conversation_id(&self) -> Result<Option<ConversationId>, MemoryError> {
332 let row: Option<(ConversationId,)> =
333 sqlx::query_as("SELECT id FROM conversations ORDER BY id DESC LIMIT 1")
334 .fetch_optional(&self.pool)
335 .await?;
336 Ok(row.map(|r| r.0))
337 }
338
339 pub async fn message_by_id(
345 &self,
346 message_id: MessageId,
347 ) -> Result<Option<Message>, MemoryError> {
348 let row: Option<(String, String, String, i64, i64)> = sqlx::query_as(
349 "SELECT role, content, parts, agent_visible, user_visible FROM messages WHERE id = ? AND deleted_at IS NULL",
350 )
351 .bind(message_id)
352 .fetch_optional(&self.pool)
353 .await?;
354
355 Ok(row.map(
356 |(role_str, content, parts_json, agent_visible, user_visible)| {
357 let parts = parse_parts_json(&role_str, &parts_json);
358 Message {
359 role: parse_role(&role_str),
360 content,
361 parts,
362 metadata: MessageMetadata {
363 agent_visible: agent_visible != 0,
364 user_visible: user_visible != 0,
365 compacted_at: None,
366 deferred_summary: None,
367 focus_pinned: false,
368 focus_marker_id: None,
369 },
370 }
371 },
372 ))
373 }
374
375 pub async fn messages_by_ids(
381 &self,
382 ids: &[MessageId],
383 ) -> Result<Vec<(MessageId, Message)>, MemoryError> {
384 if ids.is_empty() {
385 return Ok(Vec::new());
386 }
387
388 let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
389
390 let query = format!(
391 "SELECT id, role, content, parts FROM messages \
392 WHERE id IN ({placeholders}) AND agent_visible = 1 AND deleted_at IS NULL"
393 );
394 let mut q = sqlx::query_as::<_, (MessageId, String, String, String)>(&query);
395 for &id in ids {
396 q = q.bind(id);
397 }
398
399 let rows = q.fetch_all(&self.pool).await?;
400
401 Ok(rows
402 .into_iter()
403 .map(|(id, role_str, content, parts_json)| {
404 let parts = parse_parts_json(&role_str, &parts_json);
405 (
406 id,
407 Message {
408 role: parse_role(&role_str),
409 content,
410 parts,
411 metadata: MessageMetadata::default(),
412 },
413 )
414 })
415 .collect())
416 }
417
418 pub async fn unembedded_message_ids(
424 &self,
425 limit: Option<usize>,
426 ) -> Result<Vec<(MessageId, ConversationId, String, String)>, MemoryError> {
427 let effective_limit = limit.map_or(i64::MAX, |l| i64::try_from(l).unwrap_or(i64::MAX));
428
429 let rows: Vec<(MessageId, ConversationId, String, String)> = sqlx::query_as(
430 "SELECT m.id, m.conversation_id, m.role, m.content \
431 FROM messages m \
432 LEFT JOIN embeddings_metadata em ON m.id = em.message_id \
433 WHERE em.id IS NULL AND m.deleted_at IS NULL \
434 ORDER BY m.id ASC \
435 LIMIT ?",
436 )
437 .bind(effective_limit)
438 .fetch_all(&self.pool)
439 .await?;
440
441 Ok(rows)
442 }
443
444 pub async fn count_messages(
450 &self,
451 conversation_id: ConversationId,
452 ) -> Result<i64, MemoryError> {
453 let row: (i64,) = sqlx::query_as(
454 "SELECT COUNT(*) FROM messages WHERE conversation_id = ? AND deleted_at IS NULL",
455 )
456 .bind(conversation_id)
457 .fetch_one(&self.pool)
458 .await?;
459 Ok(row.0)
460 }
461
462 pub async fn count_messages_after(
468 &self,
469 conversation_id: ConversationId,
470 after_id: MessageId,
471 ) -> Result<i64, MemoryError> {
472 let row: (i64,) =
473 sqlx::query_as(
474 "SELECT COUNT(*) FROM messages WHERE conversation_id = ? AND id > ? AND deleted_at IS NULL",
475 )
476 .bind(conversation_id)
477 .bind(after_id)
478 .fetch_one(&self.pool)
479 .await?;
480 Ok(row.0)
481 }
482
483 pub async fn keyword_search(
492 &self,
493 query: &str,
494 limit: usize,
495 conversation_id: Option<ConversationId>,
496 ) -> Result<Vec<(MessageId, f64)>, MemoryError> {
497 let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
498 let safe_query = sanitize_fts5_query(query);
499 if safe_query.is_empty() {
500 return Ok(Vec::new());
501 }
502
503 let rows: Vec<(MessageId, f64)> = if let Some(cid) = conversation_id {
504 sqlx::query_as(
505 "SELECT m.id, -rank AS score \
506 FROM messages_fts f \
507 JOIN messages m ON m.id = f.rowid \
508 WHERE messages_fts MATCH ? AND m.conversation_id = ? AND m.agent_visible = 1 AND m.deleted_at IS NULL \
509 ORDER BY rank \
510 LIMIT ?",
511 )
512 .bind(&safe_query)
513 .bind(cid)
514 .bind(effective_limit)
515 .fetch_all(&self.pool)
516 .await?
517 } else {
518 sqlx::query_as(
519 "SELECT m.id, -rank AS score \
520 FROM messages_fts f \
521 JOIN messages m ON m.id = f.rowid \
522 WHERE messages_fts MATCH ? AND m.agent_visible = 1 AND m.deleted_at IS NULL \
523 ORDER BY rank \
524 LIMIT ?",
525 )
526 .bind(&safe_query)
527 .bind(effective_limit)
528 .fetch_all(&self.pool)
529 .await?
530 };
531
532 Ok(rows)
533 }
534
535 pub async fn keyword_search_with_time_range(
548 &self,
549 query: &str,
550 limit: usize,
551 conversation_id: Option<ConversationId>,
552 after: Option<&str>,
553 before: Option<&str>,
554 ) -> Result<Vec<(MessageId, f64)>, MemoryError> {
555 let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
556 let safe_query = sanitize_fts5_query(query);
557 if safe_query.is_empty() {
558 return Ok(Vec::new());
559 }
560
561 let after_clause = if after.is_some() {
563 " AND m.created_at > ?"
564 } else {
565 ""
566 };
567 let before_clause = if before.is_some() {
568 " AND m.created_at < ?"
569 } else {
570 ""
571 };
572 let conv_clause = if conversation_id.is_some() {
573 " AND m.conversation_id = ?"
574 } else {
575 ""
576 };
577
578 let sql = format!(
579 "SELECT m.id, -rank AS score \
580 FROM messages_fts f \
581 JOIN messages m ON m.id = f.rowid \
582 WHERE messages_fts MATCH ? AND m.agent_visible = 1 AND m.deleted_at IS NULL\
583 {after_clause}{before_clause}{conv_clause} \
584 ORDER BY rank \
585 LIMIT ?"
586 );
587
588 let mut q = sqlx::query_as::<_, (MessageId, f64)>(&sql).bind(&safe_query);
589 if let Some(a) = after {
590 q = q.bind(a);
591 }
592 if let Some(b) = before {
593 q = q.bind(b);
594 }
595 if let Some(cid) = conversation_id {
596 q = q.bind(cid);
597 }
598 q = q.bind(effective_limit);
599
600 Ok(q.fetch_all(&self.pool).await?)
601 }
602
603 pub async fn message_timestamps(
611 &self,
612 ids: &[MessageId],
613 ) -> Result<std::collections::HashMap<MessageId, i64>, MemoryError> {
614 if ids.is_empty() {
615 return Ok(std::collections::HashMap::new());
616 }
617
618 let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
619 let query = format!(
620 "SELECT id, COALESCE(CAST(strftime('%s', created_at) AS INTEGER), 0) \
621 FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
622 );
623 let mut q = sqlx::query_as::<_, (MessageId, i64)>(&query);
624 for &id in ids {
625 q = q.bind(id);
626 }
627
628 let rows = q.fetch_all(&self.pool).await?;
629 Ok(rows.into_iter().collect())
630 }
631
632 pub async fn load_messages_range(
638 &self,
639 conversation_id: ConversationId,
640 after_message_id: MessageId,
641 limit: usize,
642 ) -> Result<Vec<(MessageId, String, String)>, MemoryError> {
643 let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
644
645 let rows: Vec<(MessageId, String, String)> = sqlx::query_as(
646 "SELECT id, role, content FROM messages \
647 WHERE conversation_id = ? AND id > ? AND deleted_at IS NULL \
648 ORDER BY id ASC LIMIT ?",
649 )
650 .bind(conversation_id)
651 .bind(after_message_id)
652 .bind(effective_limit)
653 .fetch_all(&self.pool)
654 .await?;
655
656 Ok(rows)
657 }
658
659 pub async fn get_eviction_candidates(
667 &self,
668 ) -> Result<Vec<crate::eviction::EvictionEntry>, crate::error::MemoryError> {
669 let rows: Vec<(MessageId, String, Option<String>, i64)> = sqlx::query_as(
670 "SELECT id, created_at, last_accessed, access_count \
671 FROM messages WHERE deleted_at IS NULL",
672 )
673 .fetch_all(&self.pool)
674 .await?;
675
676 Ok(rows
677 .into_iter()
678 .map(
679 |(id, created_at, last_accessed, access_count)| crate::eviction::EvictionEntry {
680 id,
681 created_at,
682 last_accessed,
683 access_count: access_count.try_into().unwrap_or(0),
684 },
685 )
686 .collect())
687 }
688
689 pub async fn soft_delete_messages(
697 &self,
698 ids: &[MessageId],
699 ) -> Result<(), crate::error::MemoryError> {
700 if ids.is_empty() {
701 return Ok(());
702 }
703 for &id in ids {
705 sqlx::query(
706 "UPDATE messages SET deleted_at = datetime('now') WHERE id = ? AND deleted_at IS NULL",
707 )
708 .bind(id)
709 .execute(&self.pool)
710 .await?;
711 }
712 Ok(())
713 }
714
715 pub async fn get_soft_deleted_message_ids(
721 &self,
722 ) -> Result<Vec<MessageId>, crate::error::MemoryError> {
723 let rows: Vec<(MessageId,)> = sqlx::query_as(
724 "SELECT id FROM messages WHERE deleted_at IS NOT NULL AND qdrant_cleaned = 0",
725 )
726 .fetch_all(&self.pool)
727 .await?;
728 Ok(rows.into_iter().map(|(id,)| id).collect())
729 }
730
731 pub async fn mark_qdrant_cleaned(
737 &self,
738 ids: &[MessageId],
739 ) -> Result<(), crate::error::MemoryError> {
740 for &id in ids {
741 sqlx::query("UPDATE messages SET qdrant_cleaned = 1 WHERE id = ?")
742 .bind(id)
743 .execute(&self.pool)
744 .await?;
745 }
746 Ok(())
747 }
748}
749
750#[cfg(test)]
751mod tests;