1use zeph_llm::provider::{Message, MessageMetadata, MessagePart, Role};
5
6use super::SqliteStore;
7use crate::error::MemoryError;
8use crate::types::{ConversationId, MessageId};
9
10pub(crate) fn sanitize_fts5_query(query: &str) -> String {
20 query
21 .split(|c: char| !c.is_alphanumeric())
22 .filter(|t| !t.is_empty())
23 .collect::<Vec<_>>()
24 .join(" ")
25}
26
27fn parse_role(s: &str) -> Role {
28 match s {
29 "assistant" => Role::Assistant,
30 "system" => Role::System,
31 _ => Role::User,
32 }
33}
34
35#[must_use]
36pub fn role_str(role: Role) -> &'static str {
37 match role {
38 Role::System => "system",
39 Role::User => "user",
40 Role::Assistant => "assistant",
41 }
42}
43
44fn parse_parts_json(role_str: &str, parts_json: &str) -> Vec<MessagePart> {
49 if parts_json == "[]" {
50 return vec![];
51 }
52 match serde_json::from_str(parts_json) {
53 Ok(p) => p,
54 Err(e) => {
55 let truncated = parts_json.chars().take(120).collect::<String>();
56 tracing::warn!(
57 role = %role_str,
58 parts_json = %truncated,
59 error = %e,
60 "failed to deserialize message parts, falling back to empty"
61 );
62 vec![]
63 }
64 }
65}
66
67impl SqliteStore {
68 pub async fn create_conversation(&self) -> Result<ConversationId, MemoryError> {
74 let row: (ConversationId,) =
75 sqlx::query_as("INSERT INTO conversations DEFAULT VALUES RETURNING id")
76 .fetch_one(&self.pool)
77 .await?;
78 Ok(row.0)
79 }
80
81 pub async fn save_message(
87 &self,
88 conversation_id: ConversationId,
89 role: &str,
90 content: &str,
91 ) -> Result<MessageId, MemoryError> {
92 self.save_message_with_parts(conversation_id, role, content, "[]")
93 .await
94 }
95
96 pub async fn save_message_with_parts(
102 &self,
103 conversation_id: ConversationId,
104 role: &str,
105 content: &str,
106 parts_json: &str,
107 ) -> Result<MessageId, MemoryError> {
108 self.save_message_with_metadata(conversation_id, role, content, parts_json, true, true)
109 .await
110 }
111
112 pub async fn save_message_with_metadata(
118 &self,
119 conversation_id: ConversationId,
120 role: &str,
121 content: &str,
122 parts_json: &str,
123 agent_visible: bool,
124 user_visible: bool,
125 ) -> Result<MessageId, MemoryError> {
126 let importance_score = crate::semantic::importance::compute_importance(content, role);
127 let row: (MessageId,) = sqlx::query_as(
128 "INSERT INTO messages (conversation_id, role, content, parts, agent_visible, user_visible, importance_score) \
129 VALUES (?, ?, ?, ?, ?, ?, ?) RETURNING id",
130 )
131 .bind(conversation_id)
132 .bind(role)
133 .bind(content)
134 .bind(parts_json)
135 .bind(i64::from(agent_visible))
136 .bind(i64::from(user_visible))
137 .bind(importance_score)
138 .fetch_one(&self.pool)
139 .await?;
140 Ok(row.0)
141 }
142
143 pub async fn load_history(
149 &self,
150 conversation_id: ConversationId,
151 limit: u32,
152 ) -> Result<Vec<Message>, MemoryError> {
153 let rows: Vec<(String, String, String, i64, i64)> = sqlx::query_as(
154 "SELECT role, content, parts, agent_visible, user_visible FROM (\
155 SELECT role, content, parts, agent_visible, user_visible, id FROM messages \
156 WHERE conversation_id = ? AND deleted_at IS NULL \
157 ORDER BY id DESC \
158 LIMIT ?\
159 ) ORDER BY id ASC",
160 )
161 .bind(conversation_id)
162 .bind(limit)
163 .fetch_all(&self.pool)
164 .await?;
165
166 let messages = rows
167 .into_iter()
168 .map(
169 |(role_str, content, parts_json, agent_visible, user_visible)| {
170 let parts = parse_parts_json(&role_str, &parts_json);
171 Message {
172 role: parse_role(&role_str),
173 content,
174 parts,
175 metadata: MessageMetadata {
176 agent_visible: agent_visible != 0,
177 user_visible: user_visible != 0,
178 compacted_at: None,
179 deferred_summary: None,
180 focus_pinned: false,
181 focus_marker_id: None,
182 },
183 }
184 },
185 )
186 .collect();
187 Ok(messages)
188 }
189
190 pub async fn load_history_filtered(
198 &self,
199 conversation_id: ConversationId,
200 limit: u32,
201 agent_visible: Option<bool>,
202 user_visible: Option<bool>,
203 ) -> Result<Vec<Message>, MemoryError> {
204 let av = agent_visible.map(i64::from);
205 let uv = user_visible.map(i64::from);
206
207 let rows: Vec<(String, String, String, i64, i64)> = sqlx::query_as(
208 "WITH recent AS (\
209 SELECT role, content, parts, agent_visible, user_visible, id FROM messages \
210 WHERE conversation_id = ? \
211 AND deleted_at IS NULL \
212 AND (? IS NULL OR agent_visible = ?) \
213 AND (? IS NULL OR user_visible = ?) \
214 ORDER BY id DESC \
215 LIMIT ?\
216 ) SELECT role, content, parts, agent_visible, user_visible FROM recent ORDER BY id ASC",
217 )
218 .bind(conversation_id)
219 .bind(av)
220 .bind(av)
221 .bind(uv)
222 .bind(uv)
223 .bind(limit)
224 .fetch_all(&self.pool)
225 .await?;
226
227 let messages = rows
228 .into_iter()
229 .map(
230 |(role_str, content, parts_json, agent_visible, user_visible)| {
231 let parts = parse_parts_json(&role_str, &parts_json);
232 Message {
233 role: parse_role(&role_str),
234 content,
235 parts,
236 metadata: MessageMetadata {
237 agent_visible: agent_visible != 0,
238 user_visible: user_visible != 0,
239 compacted_at: None,
240 deferred_summary: None,
241 focus_pinned: false,
242 focus_marker_id: None,
243 },
244 }
245 },
246 )
247 .collect();
248 Ok(messages)
249 }
250
251 pub async fn replace_conversation(
263 &self,
264 conversation_id: ConversationId,
265 compacted_range: std::ops::RangeInclusive<MessageId>,
266 summary_role: &str,
267 summary_content: &str,
268 ) -> Result<MessageId, MemoryError> {
269 let now = {
270 let secs = std::time::SystemTime::now()
271 .duration_since(std::time::UNIX_EPOCH)
272 .unwrap_or_default()
273 .as_secs();
274 format!("{secs}")
275 };
276 let start_id = compacted_range.start().0;
277 let end_id = compacted_range.end().0;
278
279 let mut tx = self.pool.begin().await?;
280
281 sqlx::query(
282 "UPDATE messages SET agent_visible = 0, compacted_at = ? \
283 WHERE conversation_id = ? AND id >= ? AND id <= ?",
284 )
285 .bind(&now)
286 .bind(conversation_id)
287 .bind(start_id)
288 .bind(end_id)
289 .execute(&mut *tx)
290 .await?;
291
292 let row: (MessageId,) = sqlx::query_as(
294 "INSERT INTO messages \
295 (conversation_id, role, content, parts, agent_visible, user_visible) \
296 VALUES (?, ?, ?, '[]', 1, 0) RETURNING id",
297 )
298 .bind(conversation_id)
299 .bind(summary_role)
300 .bind(summary_content)
301 .fetch_one(&mut *tx)
302 .await?;
303
304 tx.commit().await?;
305
306 Ok(row.0)
307 }
308
309 pub async fn oldest_message_ids(
315 &self,
316 conversation_id: ConversationId,
317 n: u32,
318 ) -> Result<Vec<MessageId>, MemoryError> {
319 let rows: Vec<(MessageId,)> = sqlx::query_as(
320 "SELECT id FROM messages WHERE conversation_id = ? AND deleted_at IS NULL ORDER BY id ASC LIMIT ?",
321 )
322 .bind(conversation_id)
323 .bind(n)
324 .fetch_all(&self.pool)
325 .await?;
326 Ok(rows.into_iter().map(|r| r.0).collect())
327 }
328
329 pub async fn latest_conversation_id(&self) -> Result<Option<ConversationId>, MemoryError> {
335 let row: Option<(ConversationId,)> =
336 sqlx::query_as("SELECT id FROM conversations ORDER BY id DESC LIMIT 1")
337 .fetch_optional(&self.pool)
338 .await?;
339 Ok(row.map(|r| r.0))
340 }
341
342 pub async fn message_by_id(
348 &self,
349 message_id: MessageId,
350 ) -> Result<Option<Message>, MemoryError> {
351 let row: Option<(String, String, String, i64, i64)> = sqlx::query_as(
352 "SELECT role, content, parts, agent_visible, user_visible FROM messages WHERE id = ? AND deleted_at IS NULL",
353 )
354 .bind(message_id)
355 .fetch_optional(&self.pool)
356 .await?;
357
358 Ok(row.map(
359 |(role_str, content, parts_json, agent_visible, user_visible)| {
360 let parts = parse_parts_json(&role_str, &parts_json);
361 Message {
362 role: parse_role(&role_str),
363 content,
364 parts,
365 metadata: MessageMetadata {
366 agent_visible: agent_visible != 0,
367 user_visible: user_visible != 0,
368 compacted_at: None,
369 deferred_summary: None,
370 focus_pinned: false,
371 focus_marker_id: None,
372 },
373 }
374 },
375 ))
376 }
377
378 pub async fn messages_by_ids(
384 &self,
385 ids: &[MessageId],
386 ) -> Result<Vec<(MessageId, Message)>, MemoryError> {
387 if ids.is_empty() {
388 return Ok(Vec::new());
389 }
390
391 let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
392
393 let query = format!(
394 "SELECT id, role, content, parts FROM messages \
395 WHERE id IN ({placeholders}) AND agent_visible = 1 AND deleted_at IS NULL"
396 );
397 let mut q = sqlx::query_as::<_, (MessageId, String, String, String)>(&query);
398 for &id in ids {
399 q = q.bind(id);
400 }
401
402 let rows = q.fetch_all(&self.pool).await?;
403
404 Ok(rows
405 .into_iter()
406 .map(|(id, role_str, content, parts_json)| {
407 let parts = parse_parts_json(&role_str, &parts_json);
408 (
409 id,
410 Message {
411 role: parse_role(&role_str),
412 content,
413 parts,
414 metadata: MessageMetadata::default(),
415 },
416 )
417 })
418 .collect())
419 }
420
421 pub async fn unembedded_message_ids(
427 &self,
428 limit: Option<usize>,
429 ) -> Result<Vec<(MessageId, ConversationId, String, String)>, MemoryError> {
430 let effective_limit = limit.map_or(i64::MAX, |l| i64::try_from(l).unwrap_or(i64::MAX));
431
432 let rows: Vec<(MessageId, ConversationId, String, String)> = sqlx::query_as(
433 "SELECT m.id, m.conversation_id, m.role, m.content \
434 FROM messages m \
435 LEFT JOIN embeddings_metadata em ON m.id = em.message_id \
436 WHERE em.id IS NULL AND m.deleted_at IS NULL \
437 ORDER BY m.id ASC \
438 LIMIT ?",
439 )
440 .bind(effective_limit)
441 .fetch_all(&self.pool)
442 .await?;
443
444 Ok(rows)
445 }
446
447 pub async fn count_messages(
453 &self,
454 conversation_id: ConversationId,
455 ) -> Result<i64, MemoryError> {
456 let row: (i64,) = sqlx::query_as(
457 "SELECT COUNT(*) FROM messages WHERE conversation_id = ? AND deleted_at IS NULL",
458 )
459 .bind(conversation_id)
460 .fetch_one(&self.pool)
461 .await?;
462 Ok(row.0)
463 }
464
465 pub async fn count_messages_after(
471 &self,
472 conversation_id: ConversationId,
473 after_id: MessageId,
474 ) -> Result<i64, MemoryError> {
475 let row: (i64,) =
476 sqlx::query_as(
477 "SELECT COUNT(*) FROM messages WHERE conversation_id = ? AND id > ? AND deleted_at IS NULL",
478 )
479 .bind(conversation_id)
480 .bind(after_id)
481 .fetch_one(&self.pool)
482 .await?;
483 Ok(row.0)
484 }
485
486 pub async fn keyword_search(
495 &self,
496 query: &str,
497 limit: usize,
498 conversation_id: Option<ConversationId>,
499 ) -> Result<Vec<(MessageId, f64)>, MemoryError> {
500 let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
501 let safe_query = sanitize_fts5_query(query);
502 if safe_query.is_empty() {
503 return Ok(Vec::new());
504 }
505
506 let rows: Vec<(MessageId, f64)> = if let Some(cid) = conversation_id {
507 sqlx::query_as(
508 "SELECT m.id, -rank AS score \
509 FROM messages_fts f \
510 JOIN messages m ON m.id = f.rowid \
511 WHERE messages_fts MATCH ? AND m.conversation_id = ? AND m.agent_visible = 1 AND m.deleted_at IS NULL \
512 ORDER BY rank \
513 LIMIT ?",
514 )
515 .bind(&safe_query)
516 .bind(cid)
517 .bind(effective_limit)
518 .fetch_all(&self.pool)
519 .await?
520 } else {
521 sqlx::query_as(
522 "SELECT m.id, -rank AS score \
523 FROM messages_fts f \
524 JOIN messages m ON m.id = f.rowid \
525 WHERE messages_fts MATCH ? AND m.agent_visible = 1 AND m.deleted_at IS NULL \
526 ORDER BY rank \
527 LIMIT ?",
528 )
529 .bind(&safe_query)
530 .bind(effective_limit)
531 .fetch_all(&self.pool)
532 .await?
533 };
534
535 Ok(rows)
536 }
537
538 pub async fn keyword_search_with_time_range(
551 &self,
552 query: &str,
553 limit: usize,
554 conversation_id: Option<ConversationId>,
555 after: Option<&str>,
556 before: Option<&str>,
557 ) -> Result<Vec<(MessageId, f64)>, MemoryError> {
558 let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
559 let safe_query = sanitize_fts5_query(query);
560 if safe_query.is_empty() {
561 return Ok(Vec::new());
562 }
563
564 let after_clause = if after.is_some() {
566 " AND m.created_at > ?"
567 } else {
568 ""
569 };
570 let before_clause = if before.is_some() {
571 " AND m.created_at < ?"
572 } else {
573 ""
574 };
575 let conv_clause = if conversation_id.is_some() {
576 " AND m.conversation_id = ?"
577 } else {
578 ""
579 };
580
581 let sql = format!(
582 "SELECT m.id, -rank AS score \
583 FROM messages_fts f \
584 JOIN messages m ON m.id = f.rowid \
585 WHERE messages_fts MATCH ? AND m.agent_visible = 1 AND m.deleted_at IS NULL\
586 {after_clause}{before_clause}{conv_clause} \
587 ORDER BY rank \
588 LIMIT ?"
589 );
590
591 let mut q = sqlx::query_as::<_, (MessageId, f64)>(&sql).bind(&safe_query);
592 if let Some(a) = after {
593 q = q.bind(a);
594 }
595 if let Some(b) = before {
596 q = q.bind(b);
597 }
598 if let Some(cid) = conversation_id {
599 q = q.bind(cid);
600 }
601 q = q.bind(effective_limit);
602
603 Ok(q.fetch_all(&self.pool).await?)
604 }
605
606 pub async fn message_timestamps(
614 &self,
615 ids: &[MessageId],
616 ) -> Result<std::collections::HashMap<MessageId, i64>, MemoryError> {
617 if ids.is_empty() {
618 return Ok(std::collections::HashMap::new());
619 }
620
621 let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
622 let query = format!(
623 "SELECT id, COALESCE(CAST(strftime('%s', created_at) AS INTEGER), 0) \
624 FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
625 );
626 let mut q = sqlx::query_as::<_, (MessageId, i64)>(&query);
627 for &id in ids {
628 q = q.bind(id);
629 }
630
631 let rows = q.fetch_all(&self.pool).await?;
632 Ok(rows.into_iter().collect())
633 }
634
635 pub async fn load_messages_range(
641 &self,
642 conversation_id: ConversationId,
643 after_message_id: MessageId,
644 limit: usize,
645 ) -> Result<Vec<(MessageId, String, String)>, MemoryError> {
646 let effective_limit = i64::try_from(limit).unwrap_or(i64::MAX);
647
648 let rows: Vec<(MessageId, String, String)> = sqlx::query_as(
649 "SELECT id, role, content FROM messages \
650 WHERE conversation_id = ? AND id > ? AND deleted_at IS NULL \
651 ORDER BY id ASC LIMIT ?",
652 )
653 .bind(conversation_id)
654 .bind(after_message_id)
655 .bind(effective_limit)
656 .fetch_all(&self.pool)
657 .await?;
658
659 Ok(rows)
660 }
661
662 pub async fn get_eviction_candidates(
670 &self,
671 ) -> Result<Vec<crate::eviction::EvictionEntry>, crate::error::MemoryError> {
672 let rows: Vec<(MessageId, String, Option<String>, i64)> = sqlx::query_as(
673 "SELECT id, created_at, last_accessed, access_count \
674 FROM messages WHERE deleted_at IS NULL",
675 )
676 .fetch_all(&self.pool)
677 .await?;
678
679 Ok(rows
680 .into_iter()
681 .map(
682 |(id, created_at, last_accessed, access_count)| crate::eviction::EvictionEntry {
683 id,
684 created_at,
685 last_accessed,
686 access_count: access_count.try_into().unwrap_or(0),
687 },
688 )
689 .collect())
690 }
691
692 pub async fn soft_delete_messages(
700 &self,
701 ids: &[MessageId],
702 ) -> Result<(), crate::error::MemoryError> {
703 if ids.is_empty() {
704 return Ok(());
705 }
706 for &id in ids {
708 sqlx::query(
709 "UPDATE messages SET deleted_at = datetime('now') WHERE id = ? AND deleted_at IS NULL",
710 )
711 .bind(id)
712 .execute(&self.pool)
713 .await?;
714 }
715 Ok(())
716 }
717
718 pub async fn get_soft_deleted_message_ids(
724 &self,
725 ) -> Result<Vec<MessageId>, crate::error::MemoryError> {
726 let rows: Vec<(MessageId,)> = sqlx::query_as(
727 "SELECT id FROM messages WHERE deleted_at IS NOT NULL AND qdrant_cleaned = 0",
728 )
729 .fetch_all(&self.pool)
730 .await?;
731 Ok(rows.into_iter().map(|(id,)| id).collect())
732 }
733
734 pub async fn mark_qdrant_cleaned(
740 &self,
741 ids: &[MessageId],
742 ) -> Result<(), crate::error::MemoryError> {
743 for &id in ids {
744 sqlx::query("UPDATE messages SET qdrant_cleaned = 1 WHERE id = ?")
745 .bind(id)
746 .execute(&self.pool)
747 .await?;
748 }
749 Ok(())
750 }
751
752 pub async fn fetch_importance_scores(
760 &self,
761 ids: &[MessageId],
762 ) -> Result<std::collections::HashMap<MessageId, f64>, MemoryError> {
763 if ids.is_empty() {
764 return Ok(std::collections::HashMap::new());
765 }
766 let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
767 let query = format!(
768 "SELECT id, importance_score FROM messages WHERE id IN ({placeholders}) AND deleted_at IS NULL"
769 );
770 let mut q = sqlx::query_as::<_, (MessageId, f64)>(&query);
771 for &id in ids {
772 q = q.bind(id);
773 }
774 let rows = q.fetch_all(&self.pool).await?;
775 Ok(rows.into_iter().collect())
776 }
777
778 pub async fn increment_access_counts(&self, ids: &[MessageId]) -> Result<(), MemoryError> {
786 if ids.is_empty() {
787 return Ok(());
788 }
789 let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
790 let query = format!(
791 "UPDATE messages SET access_count = access_count + 1, last_accessed = datetime('now') \
792 WHERE id IN ({placeholders})"
793 );
794 let mut q = sqlx::query(&query);
795 for &id in ids {
796 q = q.bind(id);
797 }
798 q.execute(&self.pool).await?;
799 Ok(())
800 }
801}
802
803#[cfg(test)]
804mod tests;